• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11749909901

08 Nov 2024 09:35PM UTC coverage: 95.011% (-0.2%) from 95.203%
11749909901

Pull #481

github

web-flow
Merge 9afcb5665 into 213202831
Pull Request #481: Fix working directory

2 of 7 new or added lines in 2 files covered. (28.57%)

895 of 942 relevant lines covered (95.01%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.73
/executorlib/base/executor.py
1
import queue
1✔
2
from concurrent.futures import (
1✔
3
    Executor as FutureExecutor,
4
)
5
from concurrent.futures import (
1✔
6
    Future,
7
)
8
from typing import Optional
1✔
9

10
from executorlib.standalone.inputcheck import check_resource_dict
1✔
11
from executorlib.standalone.queue import cancel_items_in_queue
1✔
12
from executorlib.standalone.serialize import cloudpickle_register
1✔
13
from executorlib.standalone.thread import RaisingThread
1✔
14

15

16
class ExecutorBase(FutureExecutor):
1✔
17
    """
18
    Base class for the executor.
19

20
    Args:
21
        FutureExecutor: Base class for the executor.
22
    """
23

24
    def __init__(self):
1✔
25
        """
26
        Initialize the ExecutorBase class.
27
        """
28
        cloudpickle_register(ind=3)
1✔
29
        self._future_queue: queue.Queue = queue.Queue()
1✔
30
        self._process: Optional[RaisingThread] = None
1✔
31

32
    @property
1✔
33
    def info(self) -> Optional[dict]:
1✔
34
        """
35
        Get the information about the executor.
36

37
        Returns:
38
            Optional[dict]: Information about the executor.
39
        """
40
        if self._process is not None and isinstance(self._process, list):
1✔
41
            meta_data_dict = self._process[0]._kwargs.copy()
1✔
42
            if "future_queue" in meta_data_dict.keys():
1✔
43
                del meta_data_dict["future_queue"]
1✔
44
            meta_data_dict["max_workers"] = len(self._process)
1✔
45
            return meta_data_dict
1✔
46
        elif self._process is not None:
1✔
47
            meta_data_dict = self._process._kwargs.copy()
1✔
48
            if "future_queue" in meta_data_dict.keys():
1✔
49
                del meta_data_dict["future_queue"]
1✔
50
            return meta_data_dict
1✔
51
        else:
52
            return None
1✔
53

54
    @property
1✔
55
    def future_queue(self) -> queue.Queue:
1✔
56
        """
57
        Get the future queue.
58

59
        Returns:
60
            queue.Queue: The future queue.
61
        """
62
        return self._future_queue
×
63

64
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
1✔
65
        """
66
        Submits a callable to be executed with the given arguments.
67

68
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
69
        a Future instance representing the execution of the callable.
70

71
        Args:
72
            fn (callable): function to submit for execution
73
            args: arguments for the submitted function
74
            kwargs: keyword arguments for the submitted function
75
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
76
                                  function. Example resource dictionary: {
77
                                      cores: 1,
78
                                      threads_per_core: 1,
79
                                      gpus_per_worker: 0,
80
                                      oversubscribe: False,
81
                                      cwd: None,
82
                                      executor: None,
83
                                      hostname_localhost: False,
84
                                  }
85

86
        Returns:
87
            Future: A Future representing the given call.
88
        """
89
        check_resource_dict(function=fn)
1✔
90
        f = Future()
1✔
91
        self._future_queue.put(
1✔
92
            {
93
                "fn": fn,
94
                "args": args,
95
                "kwargs": kwargs,
96
                "future": f,
97
                "resource_dict": resource_dict,
98
            }
99
        )
100
        return f
1✔
101

102
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
103
        """
104
        Clean-up the resources associated with the Executor.
105

106
        It is safe to call this method several times. Otherwise, no other
107
        methods can be called after this one.
108

109
        Args:
110
            wait (bool): If True then shutdown will not return until all running
111
                futures have finished executing and the resources used by the
112
                parallel_executors have been reclaimed.
113
            cancel_futures (bool): If True then shutdown will cancel all pending
114
                futures. Futures that are completed or running will not be
115
                cancelled.
116
        """
117
        if cancel_futures:
1✔
118
            cancel_items_in_queue(que=self._future_queue)
×
119
        if self._process is not None:
1✔
120
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
121
            if wait:
1✔
122
                self._process.join()
1✔
123
                self._future_queue.join()
1✔
124
        self._process = None
1✔
125
        self._future_queue = None
1✔
126

127
    def _set_process(self, process: RaisingThread):
1✔
128
        """
129
        Set the process for the executor.
130

131
        Args:
132
            process (RaisingThread): The process for the executor.
133
        """
134
        self._process = process
1✔
135
        self._process.start()
1✔
136

137
    def __len__(self) -> int:
1✔
138
        """
139
        Get the length of the executor.
140

141
        Returns:
142
            int: The length of the executor.
143
        """
144
        return self._future_queue.qsize()
1✔
145

146
    def __del__(self):
1✔
147
        """
148
        Clean-up the resources associated with the Executor.
149
        """
150
        try:
1✔
151
            self.shutdown(wait=False)
1✔
152
        except (AttributeError, RuntimeError):
×
153
            pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc