• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11907763573

19 Nov 2024 06:52AM CUT coverage: 95.648%. Remained the same
11907763573

push

github

web-flow
[pre-commit.ci] pre-commit autoupdate (#502)

* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.7.3 → v0.7.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.7.3...v0.7.4)

* Update environment-openmpi.yml

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jan Janssen <jan-janssen@users.noreply.github.com>

945 of 988 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.84
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.standalone.inputcheck import (
1✔
11
    check_command_line_argument_lst,
12
    check_executor,
13
    check_gpus_per_worker,
14
    check_init_function,
15
    check_nested_flux_executor,
16
    check_oversubscribe,
17
    check_pmi,
18
    check_threads_per_core,
19
    validate_number_of_cores,
20
)
21
from executorlib.standalone.interactive.spawner import (
1✔
22
    MpiExecSpawner,
23
    SrunSpawner,
24
)
25
from executorlib.standalone.plot import (
1✔
26
    draw,
27
    generate_nodes_and_edges,
28
    generate_task_hash,
29
)
30
from executorlib.standalone.thread import RaisingThread
1✔
31

32
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
33
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
34
except ImportError:
×
35
    pass
×
36

37

38
class ExecutorWithDependencies(ExecutorBase):
1✔
39
    """
40
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
41
    dependencies.
42

43
    Args:
44
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
45
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
46
        *args: Variable length argument list.
47
        **kwargs: Arbitrary keyword arguments.
48

49
    Attributes:
50
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
51
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
52
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
53

54
    """
55

56
    def __init__(
1✔
57
        self,
58
        *args: Any,
59
        refresh_rate: float = 0.01,
60
        plot_dependency_graph: bool = False,
61
        **kwargs: Any,
62
    ) -> None:
63
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
64
        executor = create_executor(*args, **kwargs)
1✔
65
        self._set_process(
1✔
66
            RaisingThread(
67
                target=execute_tasks_with_dependencies,
68
                kwargs={
69
                    # Executor Arguments
70
                    "future_queue": self._future_queue,
71
                    "executor_queue": executor._future_queue,
72
                    "executor": executor,
73
                    "refresh_rate": refresh_rate,
74
                },
75
            )
76
        )
77
        self._future_hash_dict = {}
1✔
78
        self._task_hash_dict = {}
1✔
79
        self._generate_dependency_graph = plot_dependency_graph
1✔
80

81
    def submit(
1✔
82
        self,
83
        fn: Callable[..., Any],
84
        *args: Any,
85
        resource_dict: Dict[str, Any] = {},
86
        **kwargs: Any,
87
    ) -> Future:
88
        """
89
        Submits a task to the executor.
90

91
        Args:
92
            fn (callable): The function to be executed.
93
            *args: Variable length argument list.
94
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
95
            **kwargs: Arbitrary keyword arguments.
96

97
        Returns:
98
            Future: A future object representing the result of the task.
99

100
        """
101
        if not self._generate_dependency_graph:
1✔
102
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
103
        else:
104
            f = Future()
1✔
105
            f.set_result(None)
1✔
106
            task_dict = {
1✔
107
                "fn": fn,
108
                "args": args,
109
                "kwargs": kwargs,
110
                "future": f,
111
                "resource_dict": resource_dict,
112
            }
113
            task_hash = generate_task_hash(
1✔
114
                task_dict=task_dict,
115
                future_hash_inverse_dict={
116
                    v: k for k, v in self._future_hash_dict.items()
117
                },
118
            )
119
            self._future_hash_dict[task_hash] = f
1✔
120
            self._task_hash_dict[task_hash] = task_dict
1✔
121
        return f
1✔
122

123
    def __exit__(
1✔
124
        self,
125
        exc_type: Any,
126
        exc_val: Any,
127
        exc_tb: Any,
128
    ) -> None:
129
        """
130
        Exit method called when exiting the context manager.
131

132
        Args:
133
            exc_type: The type of the exception.
134
            exc_val: The exception instance.
135
            exc_tb: The traceback object.
136

137
        """
138
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
139
        if self._generate_dependency_graph:
1✔
140
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
141
                task_hash_dict=self._task_hash_dict,
142
                future_hash_inverse_dict={
143
                    v: k for k, v in self._future_hash_dict.items()
144
                },
145
            )
146
            return draw(node_lst=node_lst, edge_lst=edge_lst)
1✔
147

148

149
def create_executor(
1✔
150
    max_workers: Optional[int] = None,
151
    backend: str = "local",
152
    max_cores: Optional[int] = None,
153
    cache_directory: Optional[str] = None,
154
    resource_dict: Optional[dict] = None,
155
    flux_executor=None,
156
    flux_executor_pmi_mode: Optional[str] = None,
157
    flux_executor_nesting: bool = False,
158
    hostname_localhost: Optional[bool] = None,
159
    block_allocation: bool = False,
160
    init_function: Optional[callable] = None,
161
):
162
    """
163
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
164
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
165
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
166
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
167
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
168
    requires the SLURM workload manager to be installed on the system.
169

170
    Args:
171
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
172
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
173
                           recommended, as computers have a limited number of compute cores.
174
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
175
        max_cores (int): defines the number cores which can be used in parallel
176
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
177
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
178
                              - cores (int): number of MPI cores to be used for each function call
179
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
180
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
181
                              - cwd (str/None): current working directory where the parallel python task is executed
182
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
183
                                                              SLURM only) - default False
184
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
185
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
186
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
187
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
188
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
189
                                      context of an HPC cluster this essential to be able to communicate to an Executor
190
                                      running on a different compute node within the same allocation. And in principle
191
                                      any computer should be able to resolve that their own hostname points to the same
192
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
193
                                      reasons. So on MacOS it is required to set this option to true
194
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
195
                                    resource requirements, executorlib supports block allocation. In this case all
196
                                    resources have to be defined on the executor, rather than during the submission
197
                                    of the individual function.
198
        init_function (None): optional function to preset arguments for functions which are submitted later
199
    """
200
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
201
    if flux_executor is not None and backend != "flux_allocation":
1✔
202
        backend = "flux_allocation"
×
203
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
204
    cores_per_worker = resource_dict["cores"]
1✔
205
    resource_dict["cache_directory"] = cache_directory
1✔
206
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
207
    if backend == "flux_allocation":
1✔
208
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
209
        check_command_line_argument_lst(
1✔
210
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
211
        )
212
        del resource_dict["openmpi_oversubscribe"]
1✔
213
        del resource_dict["slurm_cmd_args"]
1✔
214
        resource_dict["flux_executor"] = flux_executor
1✔
215
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
216
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
217
        if block_allocation:
1✔
218
            resource_dict["init_function"] = init_function
1✔
219
            return InteractiveExecutor(
1✔
220
                max_workers=validate_number_of_cores(
221
                    max_cores=max_cores,
222
                    max_workers=max_workers,
223
                    cores_per_worker=cores_per_worker,
224
                    set_local_cores=False,
225
                ),
226
                executor_kwargs=resource_dict,
227
                spawner=FluxPythonSpawner,
228
            )
229
        else:
230
            return InteractiveStepExecutor(
×
231
                max_cores=max_cores,
232
                max_workers=max_workers,
233
                executor_kwargs=resource_dict,
234
                spawner=FluxPythonSpawner,
235
            )
236
    elif backend == "slurm_allocation":
1✔
237
        check_executor(executor=flux_executor)
×
238
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
239
        if block_allocation:
×
240
            resource_dict["init_function"] = init_function
×
241
            return InteractiveExecutor(
×
242
                max_workers=validate_number_of_cores(
243
                    max_cores=max_cores,
244
                    max_workers=max_workers,
245
                    cores_per_worker=cores_per_worker,
246
                    set_local_cores=False,
247
                ),
248
                executor_kwargs=resource_dict,
249
                spawner=SrunSpawner,
250
            )
251
        else:
252
            return InteractiveStepExecutor(
×
253
                max_cores=max_cores,
254
                max_workers=max_workers,
255
                executor_kwargs=resource_dict,
256
                spawner=SrunSpawner,
257
            )
258
    elif backend == "local":
1✔
259
        check_executor(executor=flux_executor)
1✔
260
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
261
        check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
1✔
262
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
263
        check_command_line_argument_lst(
1✔
264
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
265
        )
266
        del resource_dict["threads_per_core"]
1✔
267
        del resource_dict["gpus_per_core"]
1✔
268
        del resource_dict["slurm_cmd_args"]
1✔
269
        if block_allocation:
1✔
270
            resource_dict["init_function"] = init_function
1✔
271
            return InteractiveExecutor(
1✔
272
                max_workers=validate_number_of_cores(
273
                    max_cores=max_cores,
274
                    max_workers=max_workers,
275
                    cores_per_worker=cores_per_worker,
276
                    set_local_cores=True,
277
                ),
278
                executor_kwargs=resource_dict,
279
                spawner=MpiExecSpawner,
280
            )
281
        else:
282
            return InteractiveStepExecutor(
1✔
283
                max_cores=max_cores,
284
                max_workers=max_workers,
285
                executor_kwargs=resource_dict,
286
                spawner=MpiExecSpawner,
287
            )
288
    else:
289
        raise ValueError(
1✔
290
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
291
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc