• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11907763573

19 Nov 2024 06:52AM UTC coverage: 95.648%. Remained the same
11907763573

push

github

web-flow
[pre-commit.ci] pre-commit autoupdate (#502)

* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.7.3 → v0.7.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.7.3...v0.7.4)

* Update environment-openmpi.yml

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jan Janssen <jan-janssen@users.noreply.github.com>

945 of 988 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/executorlib/base/executor.py
1
import queue
1✔
2
from concurrent.futures import (
1✔
3
    Executor as FutureExecutor,
4
)
5
from concurrent.futures import (
1✔
6
    Future,
7
)
8
from typing import Optional
1✔
9

10
from executorlib.standalone.inputcheck import check_resource_dict
1✔
11
from executorlib.standalone.queue import cancel_items_in_queue
1✔
12
from executorlib.standalone.serialize import cloudpickle_register
1✔
13
from executorlib.standalone.thread import RaisingThread
1✔
14

15

16
class ExecutorBase(FutureExecutor):
1✔
17
    """
18
    Base class for the executor.
19

20
    Args:
21
        max_cores (int): defines the number cores which can be used in parallel
22
    """
23

24
    def __init__(self, max_cores: Optional[int] = None):
1✔
25
        """
26
        Initialize the ExecutorBase class.
27
        """
28
        cloudpickle_register(ind=3)
1✔
29
        self._max_cores = max_cores
1✔
30
        self._future_queue: queue.Queue = queue.Queue()
1✔
31
        self._process: Optional[RaisingThread] = None
1✔
32

33
    @property
1✔
34
    def info(self) -> Optional[dict]:
1✔
35
        """
36
        Get the information about the executor.
37

38
        Returns:
39
            Optional[dict]: Information about the executor.
40
        """
41
        if self._process is not None and isinstance(self._process, list):
1✔
42
            meta_data_dict = self._process[0]._kwargs.copy()
1✔
43
            if "future_queue" in meta_data_dict.keys():
1✔
44
                del meta_data_dict["future_queue"]
1✔
45
            meta_data_dict["max_workers"] = len(self._process)
1✔
46
            return meta_data_dict
1✔
47
        elif self._process is not None:
1✔
48
            meta_data_dict = self._process._kwargs.copy()
1✔
49
            if "future_queue" in meta_data_dict.keys():
1✔
50
                del meta_data_dict["future_queue"]
1✔
51
            return meta_data_dict
1✔
52
        else:
53
            return None
1✔
54

55
    @property
1✔
56
    def future_queue(self) -> queue.Queue:
1✔
57
        """
58
        Get the future queue.
59

60
        Returns:
61
            queue.Queue: The future queue.
62
        """
63
        return self._future_queue
×
64

65
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
1✔
66
        """
67
        Submits a callable to be executed with the given arguments.
68

69
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
70
        a Future instance representing the execution of the callable.
71

72
        Args:
73
            fn (callable): function to submit for execution
74
            args: arguments for the submitted function
75
            kwargs: keyword arguments for the submitted function
76
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
77
                                  function. Example resource dictionary: {
78
                                      cores: 1,
79
                                      threads_per_core: 1,
80
                                      gpus_per_worker: 0,
81
                                      oversubscribe: False,
82
                                      cwd: None,
83
                                      executor: None,
84
                                      hostname_localhost: False,
85
                                  }
86

87
        Returns:
88
            Future: A Future representing the given call.
89
        """
90
        cores = resource_dict.get("cores", None)
1✔
91
        if (
1✔
92
            cores is not None
93
            and self._max_cores is not None
94
            and cores > self._max_cores
95
        ):
96
            raise ValueError(
1✔
97
                "The specified number of cores is larger than the available number of cores."
98
            )
99
        check_resource_dict(function=fn)
1✔
100
        f = Future()
1✔
101
        self._future_queue.put(
1✔
102
            {
103
                "fn": fn,
104
                "args": args,
105
                "kwargs": kwargs,
106
                "future": f,
107
                "resource_dict": resource_dict,
108
            }
109
        )
110
        return f
1✔
111

112
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
113
        """
114
        Clean-up the resources associated with the Executor.
115

116
        It is safe to call this method several times. Otherwise, no other
117
        methods can be called after this one.
118

119
        Args:
120
            wait (bool): If True then shutdown will not return until all running
121
                futures have finished executing and the resources used by the
122
                parallel_executors have been reclaimed.
123
            cancel_futures (bool): If True then shutdown will cancel all pending
124
                futures. Futures that are completed or running will not be
125
                cancelled.
126
        """
127
        if cancel_futures:
1✔
128
            cancel_items_in_queue(que=self._future_queue)
×
129
        if self._process is not None:
1✔
130
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
131
            if wait:
1✔
132
                self._process.join()
1✔
133
                self._future_queue.join()
1✔
134
        self._process = None
1✔
135
        self._future_queue = None
1✔
136

137
    def _set_process(self, process: RaisingThread):
1✔
138
        """
139
        Set the process for the executor.
140

141
        Args:
142
            process (RaisingThread): The process for the executor.
143
        """
144
        self._process = process
1✔
145
        self._process.start()
1✔
146

147
    def __len__(self) -> int:
1✔
148
        """
149
        Get the length of the executor.
150

151
        Returns:
152
            int: The length of the executor.
153
        """
154
        return self._future_queue.qsize()
1✔
155

156
    def __del__(self):
1✔
157
        """
158
        Clean-up the resources associated with the Executor.
159
        """
160
        try:
1✔
161
            self.shutdown(wait=False)
1✔
162
        except (AttributeError, RuntimeError):
×
163
            pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc