• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13119627889

03 Feb 2025 05:51PM UTC coverage: 95.996% (-0.5%) from 96.536%
13119627889

Pull #555

github

web-flow
Merge 05f160496 into 5ec2a2015
Pull Request #555: Add linting

75 of 85 new or added lines in 16 files covered. (88.24%)

1 existing line in 1 file now uncovered.

1079 of 1124 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.88
/executorlib/base/executor.py
1
import contextlib
1✔
2
import queue
1✔
3
from concurrent.futures import (
1✔
4
    Executor as FutureExecutor,
5
)
6
from concurrent.futures import (
1✔
7
    Future,
8
)
9
from typing import Callable, Optional, Union
1✔
10

11
from executorlib.standalone.inputcheck import check_resource_dict
1✔
12
from executorlib.standalone.queue import cancel_items_in_queue
1✔
13
from executorlib.standalone.serialize import cloudpickle_register
1✔
14
from executorlib.standalone.thread import RaisingThread
1✔
15

16

17
class ExecutorBase(FutureExecutor):
1✔
18
    """
19
    Base class for the executor.
20

21
    Args:
22
        max_cores (int): defines the number cores which can be used in parallel
23
    """
24

25
    def __init__(self, max_cores: Optional[int] = None):
1✔
26
        """
27
        Initialize the ExecutorBase class.
28
        """
29
        cloudpickle_register(ind=3)
1✔
30
        self._max_cores = max_cores
1✔
31
        self._future_queue: Optional[queue.Queue] = queue.Queue()
1✔
32
        self._process: Optional[Union[RaisingThread, list[RaisingThread]]] = None
1✔
33

34
    @property
1✔
35
    def info(self) -> Optional[dict]:
1✔
36
        """
37
        Get the information about the executor.
38

39
        Returns:
40
            Optional[dict]: Information about the executor.
41
        """
42
        if self._process is not None and isinstance(self._process, list):
1✔
43
            meta_data_dict = self._process[0].get_kwargs().copy()
1✔
44
            if "future_queue" in meta_data_dict:
1✔
45
                del meta_data_dict["future_queue"]
1✔
46
            meta_data_dict["max_workers"] = len(self._process)
1✔
47
            return meta_data_dict
1✔
48
        elif self._process is not None:
1✔
49
            meta_data_dict = self._process.get_kwargs().copy()
1✔
50
            if "future_queue" in meta_data_dict:
1✔
51
                del meta_data_dict["future_queue"]
1✔
52
            return meta_data_dict
1✔
53
        else:
54
            return None
1✔
55

56
    @property
1✔
57
    def future_queue(self) -> Optional[queue.Queue]:
1✔
58
        """
59
        Get the future queue.
60

61
        Returns:
62
            queue.Queue: The future queue.
63
        """
64
        return self._future_queue
×
65

66
    def submit(
1✔
67
        self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs  # type: ignore
68
    ) -> Future:
69
        """
70
        Submits a callable to be executed with the given arguments.
71

72
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
73
        a Future instance representing the execution of the callable.
74

75
        Args:
76
            fn (callable): function to submit for execution
77
            args: arguments for the submitted function
78
            kwargs: keyword arguments for the submitted function
79
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
80
                                  function. Example resource dictionary: {
81
                                      cores: 1,
82
                                      threads_per_core: 1,
83
                                      gpus_per_worker: 0,
84
                                      oversubscribe: False,
85
                                      cwd: None,
86
                                      executor: None,
87
                                      hostname_localhost: False,
88
                                  }
89

90
        Returns:
91
            Future: A Future representing the given call.
92
        """
93
        if resource_dict is None:
1✔
94
            resource_dict = {}
1✔
95
        cores = resource_dict.get("cores")
1✔
96
        if (
1✔
97
            cores is not None
98
            and self._max_cores is not None
99
            and cores > self._max_cores
100
        ):
101
            raise ValueError(
1✔
102
                "The specified number of cores is larger than the available number of cores."
103
            )
104
        check_resource_dict(function=fn)
1✔
105
        f: Future = Future()
1✔
106
        if self._future_queue is not None:
1✔
107
            self._future_queue.put(
1✔
108
                {
109
                    "fn": fn,
110
                    "args": args,
111
                    "kwargs": kwargs,
112
                    "future": f,
113
                    "resource_dict": resource_dict,
114
                }
115
            )
116
        return f
1✔
117

118
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
119
        """
120
        Clean-up the resources associated with the Executor.
121

122
        It is safe to call this method several times. Otherwise, no other
123
        methods can be called after this one.
124

125
        Args:
126
            wait (bool): If True then shutdown will not return until all running
127
                futures have finished executing and the resources used by the
128
                parallel_executors have been reclaimed.
129
            cancel_futures (bool): If True then shutdown will cancel all pending
130
                futures. Futures that are completed or running will not be
131
                cancelled.
132
        """
133
        if cancel_futures and self._future_queue is not None:
1✔
UNCOV
134
            cancel_items_in_queue(que=self._future_queue)
×
135
        if self._process is not None and self._future_queue is not None:
1✔
136
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
137
            if wait and isinstance(self._process, RaisingThread):
1✔
138
                self._process.join()
1✔
139
                self._future_queue.join()
1✔
140
        self._process = None
1✔
141
        self._future_queue = None
1✔
142

143
    def _set_process(self, process: RaisingThread):
1✔
144
        """
145
        Set the process for the executor.
146

147
        Args:
148
            process (RaisingThread): The process for the executor.
149
        """
150
        self._process = process
1✔
151
        self._process.start()
1✔
152

153
    def __len__(self) -> int:
1✔
154
        """
155
        Get the length of the executor.
156

157
        Returns:
158
            int: The length of the executor.
159
        """
160
        queue_size = 0
1✔
161
        if self._future_queue is not None:
1✔
162
            queue_size = self._future_queue.qsize()
1✔
163
        return queue_size
1✔
164

165
    def __del__(self):
1✔
166
        """
167
        Clean-up the resources associated with the Executor.
168
        """
169
        with contextlib.suppress(AttributeError, RuntimeError):
1✔
170
            self.shutdown(wait=False)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc