• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12483232754

24 Dec 2024 02:59PM UTC coverage: 96.004% (-0.4%) from 96.367%
12483232754

Pull #535

github

web-flow
Merge 1607bb008 into 5cf3ecc7e
Pull Request #535: Add type checking with mypy

117 of 124 new or added lines in 15 files covered. (94.35%)

7 existing lines in 4 files now uncovered.

1033 of 1076 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.94
/executorlib/base/executor.py
1
import queue
1✔
2
from concurrent.futures import (
1✔
3
    Executor as FutureExecutor,
4
)
5
from concurrent.futures import (
1✔
6
    Future,
7
)
8
from typing import Callable, Optional, List, Union
1✔
9

10
from executorlib.standalone.inputcheck import check_resource_dict
1✔
11
from executorlib.standalone.queue import cancel_items_in_queue
1✔
12
from executorlib.standalone.serialize import cloudpickle_register
1✔
13
from executorlib.standalone.thread import RaisingThread
1✔
14

15

16
class ExecutorBase(FutureExecutor):
1✔
17
    """
18
    Base class for the executor.
19

20
    Args:
21
        max_cores (int): defines the number cores which can be used in parallel
22
    """
23

24
    def __init__(self, max_cores: Optional[int] = None):
1✔
25
        """
26
        Initialize the ExecutorBase class.
27
        """
28
        cloudpickle_register(ind=3)
1✔
29
        self._max_cores = max_cores
1✔
30
        self._future_queue: Optional[queue.Queue] = queue.Queue()
1✔
31
        self._process: Optional[Union[RaisingThread, List[RaisingThread]]] = None
1✔
32

33
    @property
1✔
34
    def info(self) -> Optional[dict]:
1✔
35
        """
36
        Get the information about the executor.
37

38
        Returns:
39
            Optional[dict]: Information about the executor.
40
        """
41
        if self._process is not None and isinstance(self._process, list):
1✔
42
            meta_data_dict = self._process[0].get_kwargs().copy()
1✔
43
            if "future_queue" in meta_data_dict.keys():
1✔
44
                del meta_data_dict["future_queue"]
1✔
45
            meta_data_dict["max_workers"] = len(self._process)
1✔
46
            return meta_data_dict
1✔
47
        elif self._process is not None:
1✔
48
            meta_data_dict = self._process.get_kwargs().copy()
1✔
49
            if "future_queue" in meta_data_dict.keys():
1✔
50
                del meta_data_dict["future_queue"]
1✔
51
            return meta_data_dict
1✔
52
        else:
53
            return None
1✔
54

55
    @property
1✔
56
    def future_queue(self) -> Optional[queue.Queue]:
1✔
57
        """
58
        Get the future queue.
59

60
        Returns:
61
            queue.Queue: The future queue.
62
        """
63
        return self._future_queue
×
64

65
    def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future:  # type: ignore
1✔
66
        """
67
        Submits a callable to be executed with the given arguments.
68

69
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
70
        a Future instance representing the execution of the callable.
71

72
        Args:
73
            fn (callable): function to submit for execution
74
            args: arguments for the submitted function
75
            kwargs: keyword arguments for the submitted function
76
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
77
                                  function. Example resource dictionary: {
78
                                      cores: 1,
79
                                      threads_per_core: 1,
80
                                      gpus_per_worker: 0,
81
                                      oversubscribe: False,
82
                                      cwd: None,
83
                                      executor: None,
84
                                      hostname_localhost: False,
85
                                  }
86

87
        Returns:
88
            Future: A Future representing the given call.
89
        """
90
        cores = resource_dict.get("cores", None)
1✔
91
        if (
1✔
92
            cores is not None
93
            and self._max_cores is not None
94
            and cores > self._max_cores
95
        ):
96
            raise ValueError(
1✔
97
                "The specified number of cores is larger than the available number of cores."
98
            )
99
        check_resource_dict(function=fn)
1✔
100
        f: Future = Future()
1✔
101
        if self._future_queue is not None:
1✔
102
            self._future_queue.put(
1✔
103
                {
104
                    "fn": fn,
105
                    "args": args,
106
                    "kwargs": kwargs,
107
                    "future": f,
108
                    "resource_dict": resource_dict,
109
                }
110
            )
111
        return f
1✔
112

113
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
114
        """
115
        Clean-up the resources associated with the Executor.
116

117
        It is safe to call this method several times. Otherwise, no other
118
        methods can be called after this one.
119

120
        Args:
121
            wait (bool): If True then shutdown will not return until all running
122
                futures have finished executing and the resources used by the
123
                parallel_executors have been reclaimed.
124
            cancel_futures (bool): If True then shutdown will cancel all pending
125
                futures. Futures that are completed or running will not be
126
                cancelled.
127
        """
128
        if cancel_futures and self._future_queue is not None:
1✔
129
            cancel_items_in_queue(que=self._future_queue)
×
130
        if self._process is not None and self._future_queue is not None:
1✔
131
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
132
            if wait and isinstance(self._process, RaisingThread):
1✔
133
                self._process.join()
1✔
134
                self._future_queue.join()
1✔
135
        self._process = None
1✔
136
        self._future_queue = None
1✔
137

138
    def _set_process(self, process: RaisingThread):
1✔
139
        """
140
        Set the process for the executor.
141

142
        Args:
143
            process (RaisingThread): The process for the executor.
144
        """
145
        self._process = process
1✔
146
        self._process.start()
1✔
147

148
    def __len__(self) -> int:
1✔
149
        """
150
        Get the length of the executor.
151

152
        Returns:
153
            int: The length of the executor.
154
        """
155
        if self._future_queue is not None:
1✔
156
            return self._future_queue.qsize()
1✔
157
        else:
NEW
158
            return 0
×
159

160
    def __del__(self):
1✔
161
        """
162
        Clean-up the resources associated with the Executor.
163
        """
164
        try:
1✔
165
            self.shutdown(wait=False)
1✔
166
        except (AttributeError, RuntimeError):
×
167
            pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc