• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13088342861

01 Feb 2025 11:59AM UTC coverage: 96.459% (-0.003%) from 96.462%
13088342861

Pull #549

github

web-flow
Merge 4198bbb5d into 7f9448a0b
Pull Request #549: [minor] Fix Initialization for ExecutorWithDependencies

1 of 1 new or added line in 1 file covered. (100.0%)

4 existing lines in 1 file now uncovered.

1035 of 1073 relevant lines covered (96.46%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.48
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.interactive.slurm import SrunSpawner
1✔
11
from executorlib.interactive.slurm import (
1✔
12
    validate_max_workers as validate_max_workers_slurm,
13
)
14
from executorlib.standalone.inputcheck import (
1✔
15
    check_command_line_argument_lst,
16
    check_executor,
17
    check_flux_log_files,
18
    check_gpus_per_worker,
19
    check_init_function,
20
    check_nested_flux_executor,
21
    check_oversubscribe,
22
    check_pmi,
23
    validate_number_of_cores,
24
)
25
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
26
from executorlib.standalone.plot import (
1✔
27
    draw,
28
    generate_nodes_and_edges,
29
    generate_task_hash,
30
)
31
from executorlib.standalone.thread import RaisingThread
1✔
32

33
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
34
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
35
    from executorlib.interactive.flux import (
1✔
36
        validate_max_workers as validate_max_workers_flux,
37
    )
38
except ImportError:
×
39
    pass
×
40

41

42
class ExecutorWithDependencies(ExecutorBase):
1✔
43
    """
44
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
45
    dependencies.
46

47
    Args:
48
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
49
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
50
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
51
        *args: Variable length argument list.
52
        **kwargs: Arbitrary keyword arguments.
53

54
    Attributes:
55
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
56
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
57
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
58
        _generate_dependency_graph (str): Name of the file to store the plotted graph in.
59

60
    """
61

62
    def __init__(
1✔
63
        self,
64
        executor: ExecutorBase,
65
        max_cores: Optional[int] = None,
66
        refresh_rate: float = 0.01,
67
        plot_dependency_graph: bool = False,
68
        plot_dependency_graph_filename: Optional[str] = None,
69
    ) -> None:
70
        super().__init__(max_cores=max_cores)
1✔
71
        self._set_process(
1✔
72
            RaisingThread(
73
                target=execute_tasks_with_dependencies,
74
                kwargs={
75
                    # Executor Arguments
76
                    "future_queue": self._future_queue,
77
                    "executor_queue": executor._future_queue,
78
                    "executor": executor,
79
                    "refresh_rate": refresh_rate,
80
                },
81
            )
82
        )
83
        self._future_hash_dict: dict = {}
1✔
84
        self._task_hash_dict: dict = {}
1✔
85
        self._plot_dependency_graph_filename = plot_dependency_graph_filename
1✔
86
        if plot_dependency_graph_filename is None:
1✔
87
            self._generate_dependency_graph = plot_dependency_graph
1✔
88
        else:
89
            self._generate_dependency_graph = True
1✔
90

91
    def submit(  # type: ignore
1✔
92
        self,
93
        fn: Callable[..., Any],
94
        *args: Any,
95
        resource_dict: Dict[str, Any] = {},
96
        **kwargs: Any,
97
    ) -> Future:
98
        """
99
        Submits a task to the executor.
100

101
        Args:
102
            fn (Callable): The function to be executed.
103
            *args: Variable length argument list.
104
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
105
            **kwargs: Arbitrary keyword arguments.
106

107
        Returns:
108
            Future: A future object representing the result of the task.
109

110
        """
111
        if not self._generate_dependency_graph:
1✔
112
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
113
        else:
114
            f = Future()
1✔
115
            f.set_result(None)
1✔
116
            task_dict = {
1✔
117
                "fn": fn,
118
                "args": args,
119
                "kwargs": kwargs,
120
                "future": f,
121
                "resource_dict": resource_dict,
122
            }
123
            task_hash = generate_task_hash(
1✔
124
                task_dict=task_dict,
125
                future_hash_inverse_dict={
126
                    v: k for k, v in self._future_hash_dict.items()
127
                },
128
            )
129
            self._future_hash_dict[task_hash] = f
1✔
130
            self._task_hash_dict[task_hash] = task_dict
1✔
131
        return f
1✔
132

133
    def __exit__(
1✔
134
        self,
135
        exc_type: Any,
136
        exc_val: Any,
137
        exc_tb: Any,
138
    ) -> None:
139
        """
140
        Exit method called when exiting the context manager.
141

142
        Args:
143
            exc_type: The type of the exception.
144
            exc_val: The exception instance.
145
            exc_tb: The traceback object.
146

147
        """
148
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)  # type: ignore
1✔
149
        if self._generate_dependency_graph:
1✔
150
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
151
                task_hash_dict=self._task_hash_dict,
152
                future_hash_inverse_dict={
153
                    v: k for k, v in self._future_hash_dict.items()
154
                },
155
            )
156
            return draw(
1✔
157
                node_lst=node_lst,
158
                edge_lst=edge_lst,
159
                filename=self._plot_dependency_graph_filename,
160
            )
161

162

163
def create_executor(
1✔
164
    max_workers: Optional[int] = None,
165
    backend: str = "local",
166
    max_cores: Optional[int] = None,
167
    cache_directory: Optional[str] = None,
168
    resource_dict: dict = {},
169
    flux_executor=None,
170
    flux_executor_pmi_mode: Optional[str] = None,
171
    flux_executor_nesting: bool = False,
172
    flux_log_files: bool = False,
173
    hostname_localhost: Optional[bool] = None,
174
    block_allocation: bool = False,
175
    init_function: Optional[Callable] = None,
176
) -> ExecutorBase:
177
    """
178
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
179
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
180
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
181
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
182
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
183
    requires the SLURM workload manager to be installed on the system.
184

185
    Args:
186
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
187
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
188
                           recommended, as computers have a limited number of compute cores.
189
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
190
        max_cores (int): defines the number cores which can be used in parallel
191
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
192
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
193
                              - cores (int): number of MPI cores to be used for each function call
194
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
195
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
196
                              - cwd (str/None): current working directory where the parallel python task is executed
197
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
198
                                                              SLURM only) - default False
199
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
200
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
201
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
202
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
203
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
204
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
205
                                      context of an HPC cluster this essential to be able to communicate to an Executor
206
                                      running on a different compute node within the same allocation. And in principle
207
                                      any computer should be able to resolve that their own hostname points to the same
208
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
209
                                      reasons. So on MacOS it is required to set this option to true
210
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
211
                                    resource requirements, executorlib supports block allocation. In this case all
212
                                    resources have to be defined on the executor, rather than during the submission
213
                                    of the individual function.
214
        init_function (None): optional function to preset arguments for functions which are submitted later
215
    """
216
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
217
    if flux_executor is not None and backend != "flux_allocation":
1✔
UNCOV
218
        backend = "flux_allocation"
×
219
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
220
    cores_per_worker = resource_dict.get("cores", 1)
1✔
221
    resource_dict["cache_directory"] = cache_directory
1✔
222
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
223
    if backend == "flux_allocation":
1✔
224
        check_oversubscribe(
1✔
225
            oversubscribe=resource_dict.get("openmpi_oversubscribe", False)
226
        )
227
        check_command_line_argument_lst(
1✔
228
            command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
229
        )
230
        if "openmpi_oversubscribe" in resource_dict.keys():
1✔
231
            del resource_dict["openmpi_oversubscribe"]
1✔
232
        if "slurm_cmd_args" in resource_dict.keys():
1✔
233
            del resource_dict["slurm_cmd_args"]
1✔
234
        resource_dict["flux_executor"] = flux_executor
1✔
235
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
236
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
237
        resource_dict["flux_log_files"] = flux_log_files
1✔
238
        if block_allocation:
1✔
239
            resource_dict["init_function"] = init_function
1✔
240
            max_workers = validate_number_of_cores(
1✔
241
                max_cores=max_cores,
242
                max_workers=max_workers,
243
                cores_per_worker=cores_per_worker,
244
                set_local_cores=False,
245
            )
246
            validate_max_workers_flux(
1✔
247
                max_workers=max_workers,
248
                cores=cores_per_worker,
249
                threads_per_core=resource_dict.get("threads_per_core", 1),
250
            )
251
            return InteractiveExecutor(
1✔
252
                max_workers=max_workers,
253
                executor_kwargs=resource_dict,
254
                spawner=FluxPythonSpawner,
255
            )
256
        else:
UNCOV
257
            return InteractiveStepExecutor(
×
258
                max_cores=max_cores,
259
                max_workers=max_workers,
260
                executor_kwargs=resource_dict,
261
                spawner=FluxPythonSpawner,
262
            )
263
    elif backend == "slurm_allocation":
1✔
264
        check_executor(executor=flux_executor)
1✔
265
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
266
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
267
        if block_allocation:
1✔
268
            resource_dict["init_function"] = init_function
1✔
269
            max_workers = validate_number_of_cores(
1✔
270
                max_cores=max_cores,
271
                max_workers=max_workers,
272
                cores_per_worker=cores_per_worker,
273
                set_local_cores=False,
274
            )
275
            validate_max_workers_slurm(
1✔
276
                max_workers=max_workers,
277
                cores=cores_per_worker,
278
                threads_per_core=resource_dict.get("threads_per_core", 1),
279
            )
UNCOV
280
            return InteractiveExecutor(
×
281
                max_workers=max_workers,
282
                executor_kwargs=resource_dict,
283
                spawner=SrunSpawner,
284
            )
285
        else:
UNCOV
286
            return InteractiveStepExecutor(
×
287
                max_cores=max_cores,
288
                max_workers=max_workers,
289
                executor_kwargs=resource_dict,
290
                spawner=SrunSpawner,
291
            )
292
    elif backend == "local":
1✔
293
        check_executor(executor=flux_executor)
1✔
294
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
295
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
296
        check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
1✔
297
        check_command_line_argument_lst(
1✔
298
            command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
299
        )
300
        if "threads_per_core" in resource_dict.keys():
1✔
301
            del resource_dict["threads_per_core"]
1✔
302
        if "gpus_per_core" in resource_dict.keys():
1✔
303
            del resource_dict["gpus_per_core"]
1✔
304
        if "slurm_cmd_args" in resource_dict.keys():
1✔
305
            del resource_dict["slurm_cmd_args"]
1✔
306
        if block_allocation:
1✔
307
            resource_dict["init_function"] = init_function
1✔
308
            return InteractiveExecutor(
1✔
309
                max_workers=validate_number_of_cores(
310
                    max_cores=max_cores,
311
                    max_workers=max_workers,
312
                    cores_per_worker=cores_per_worker,
313
                    set_local_cores=True,
314
                ),
315
                executor_kwargs=resource_dict,
316
                spawner=MpiExecSpawner,
317
            )
318
        else:
319
            return InteractiveStepExecutor(
1✔
320
                max_cores=max_cores,
321
                max_workers=max_workers,
322
                executor_kwargs=resource_dict,
323
                spawner=MpiExecSpawner,
324
            )
325
    else:
326
        raise ValueError(
1✔
327
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
328
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc