• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12425444448

20 Dec 2024 04:17AM UTC coverage: 95.252% (-0.5%) from 95.793%
12425444448

Pull #529

github

web-flow
Merge 133bd5a9c into f1649fab2
Pull Request #529: Add validate_max_workers_slurm method

6 of 12 new or added lines in 2 files covered. (50.0%)

1 existing line in 1 file now uncovered.

983 of 1032 relevant lines covered (95.25%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.88
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.interactive.slurm import SrunSpawner
1✔
11
from executorlib.interactive.slurm import (
1✔
12
    validate_max_workers as validate_max_workers_slurm,
13
)
14
from executorlib.standalone.inputcheck import (
1✔
15
    check_command_line_argument_lst,
16
    check_executor,
17
    check_gpus_per_worker,
18
    check_init_function,
19
    check_nested_flux_executor,
20
    check_oversubscribe,
21
    check_pmi,
22
    validate_number_of_cores,
23
)
24
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
25
from executorlib.standalone.plot import (
1✔
26
    draw,
27
    generate_nodes_and_edges,
28
    generate_task_hash,
29
)
30
from executorlib.standalone.thread import RaisingThread
1✔
31

32
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
33
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
34
    from executorlib.interactive.flux import (
1✔
35
        validate_max_workers as validate_max_workers_flux,
36
    )
37
except ImportError:
×
38
    pass
×
39

40

41
class ExecutorWithDependencies(ExecutorBase):
1✔
42
    """
43
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
44
    dependencies.
45

46
    Args:
47
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
48
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
49
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
50
        *args: Variable length argument list.
51
        **kwargs: Arbitrary keyword arguments.
52

53
    Attributes:
54
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
55
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
56
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
57
        _generate_dependency_graph (str): Name of the file to store the plotted graph in.
58

59
    """
60

61
    def __init__(
1✔
62
        self,
63
        *args: Any,
64
        refresh_rate: float = 0.01,
65
        plot_dependency_graph: bool = False,
66
        plot_dependency_graph_filename: Optional[str] = None,
67
        **kwargs: Any,
68
    ) -> None:
69
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
70
        executor = create_executor(*args, **kwargs)
1✔
71
        self._set_process(
1✔
72
            RaisingThread(
73
                target=execute_tasks_with_dependencies,
74
                kwargs={
75
                    # Executor Arguments
76
                    "future_queue": self._future_queue,
77
                    "executor_queue": executor._future_queue,
78
                    "executor": executor,
79
                    "refresh_rate": refresh_rate,
80
                },
81
            )
82
        )
83
        self._future_hash_dict = {}
1✔
84
        self._task_hash_dict = {}
1✔
85
        self._plot_dependency_graph_filename = plot_dependency_graph_filename
1✔
86
        if plot_dependency_graph_filename is None:
1✔
87
            self._generate_dependency_graph = plot_dependency_graph
1✔
88
        else:
89
            self._generate_dependency_graph = True
1✔
90

91
    def submit(
1✔
92
        self,
93
        fn: Callable[..., Any],
94
        *args: Any,
95
        resource_dict: Dict[str, Any] = {},
96
        **kwargs: Any,
97
    ) -> Future:
98
        """
99
        Submits a task to the executor.
100

101
        Args:
102
            fn (callable): The function to be executed.
103
            *args: Variable length argument list.
104
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
105
            **kwargs: Arbitrary keyword arguments.
106

107
        Returns:
108
            Future: A future object representing the result of the task.
109

110
        """
111
        if not self._generate_dependency_graph:
1✔
112
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
113
        else:
114
            f = Future()
1✔
115
            f.set_result(None)
1✔
116
            task_dict = {
1✔
117
                "fn": fn,
118
                "args": args,
119
                "kwargs": kwargs,
120
                "future": f,
121
                "resource_dict": resource_dict,
122
            }
123
            task_hash = generate_task_hash(
1✔
124
                task_dict=task_dict,
125
                future_hash_inverse_dict={
126
                    v: k for k, v in self._future_hash_dict.items()
127
                },
128
            )
129
            self._future_hash_dict[task_hash] = f
1✔
130
            self._task_hash_dict[task_hash] = task_dict
1✔
131
        return f
1✔
132

133
    def __exit__(
1✔
134
        self,
135
        exc_type: Any,
136
        exc_val: Any,
137
        exc_tb: Any,
138
    ) -> None:
139
        """
140
        Exit method called when exiting the context manager.
141

142
        Args:
143
            exc_type: The type of the exception.
144
            exc_val: The exception instance.
145
            exc_tb: The traceback object.
146

147
        """
148
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
149
        if self._generate_dependency_graph:
1✔
150
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
151
                task_hash_dict=self._task_hash_dict,
152
                future_hash_inverse_dict={
153
                    v: k for k, v in self._future_hash_dict.items()
154
                },
155
            )
156
            return draw(
1✔
157
                node_lst=node_lst,
158
                edge_lst=edge_lst,
159
                filename=self._plot_dependency_graph_filename,
160
            )
161

162

163
def create_executor(
1✔
164
    max_workers: Optional[int] = None,
165
    backend: str = "local",
166
    max_cores: Optional[int] = None,
167
    cache_directory: Optional[str] = None,
168
    resource_dict: Optional[dict] = None,
169
    flux_executor=None,
170
    flux_executor_pmi_mode: Optional[str] = None,
171
    flux_executor_nesting: bool = False,
172
    hostname_localhost: Optional[bool] = None,
173
    block_allocation: bool = False,
174
    init_function: Optional[callable] = None,
175
):
176
    """
177
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
178
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
179
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
180
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
181
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
182
    requires the SLURM workload manager to be installed on the system.
183

184
    Args:
185
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
186
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
187
                           recommended, as computers have a limited number of compute cores.
188
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
189
        max_cores (int): defines the number cores which can be used in parallel
190
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
191
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
192
                              - cores (int): number of MPI cores to be used for each function call
193
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
194
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
195
                              - cwd (str/None): current working directory where the parallel python task is executed
196
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
197
                                                              SLURM only) - default False
198
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
199
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
200
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
201
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
202
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
203
                                      context of an HPC cluster this essential to be able to communicate to an Executor
204
                                      running on a different compute node within the same allocation. And in principle
205
                                      any computer should be able to resolve that their own hostname points to the same
206
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
207
                                      reasons. So on MacOS it is required to set this option to true
208
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
209
                                    resource requirements, executorlib supports block allocation. In this case all
210
                                    resources have to be defined on the executor, rather than during the submission
211
                                    of the individual function.
212
        init_function (None): optional function to preset arguments for functions which are submitted later
213
    """
214
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
215
    if flux_executor is not None and backend != "flux_allocation":
1✔
216
        backend = "flux_allocation"
×
217
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
218
    cores_per_worker = resource_dict["cores"]
1✔
219
    resource_dict["cache_directory"] = cache_directory
1✔
220
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
221
    if backend == "flux_allocation":
1✔
222
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
223
        check_command_line_argument_lst(
1✔
224
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
225
        )
226
        del resource_dict["openmpi_oversubscribe"]
1✔
227
        del resource_dict["slurm_cmd_args"]
1✔
228
        resource_dict["flux_executor"] = flux_executor
1✔
229
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
230
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
231
        if block_allocation:
1✔
232
            resource_dict["init_function"] = init_function
1✔
233
            max_workers = validate_number_of_cores(
1✔
234
                max_cores=max_cores,
235
                max_workers=max_workers,
236
                cores_per_worker=cores_per_worker,
237
                set_local_cores=False,
238
            )
239
            validate_max_workers_flux(
1✔
240
                max_workers=max_workers,
241
                cores=cores_per_worker,
242
                threads_per_core=resource_dict["threads_per_core"],
243
            )
244
            return InteractiveExecutor(
1✔
245
                max_workers=max_workers,
246
                executor_kwargs=resource_dict,
247
                spawner=FluxPythonSpawner,
248
            )
249
        else:
250
            return InteractiveStepExecutor(
×
251
                max_cores=max_cores,
252
                max_workers=max_workers,
253
                executor_kwargs=resource_dict,
254
                spawner=FluxPythonSpawner,
255
            )
256
    elif backend == "slurm_allocation":
1✔
257
        check_executor(executor=flux_executor)
×
258
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
259
        if block_allocation:
×
260
            resource_dict["init_function"] = init_function
×
NEW
261
            max_workers = validate_number_of_cores(
×
262
                max_cores=max_cores,
263
                max_workers=max_workers,
264
                cores_per_worker=cores_per_worker,
265
                set_local_cores=False,
266
            )
NEW
267
            validate_max_workers_slurm(
×
268
                max_workers=max_workers,
269
                cores=cores_per_worker,
270
                threads_per_core=resource_dict["threads_per_core"],
271
            )
UNCOV
272
            return InteractiveExecutor(
×
273
                max_workers=max_workers,
274
                executor_kwargs=resource_dict,
275
                spawner=SrunSpawner,
276
            )
277
        else:
278
            return InteractiveStepExecutor(
×
279
                max_cores=max_cores,
280
                max_workers=max_workers,
281
                executor_kwargs=resource_dict,
282
                spawner=SrunSpawner,
283
            )
284
    elif backend == "local":
1✔
285
        check_executor(executor=flux_executor)
1✔
286
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
287
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
288
        check_command_line_argument_lst(
1✔
289
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
290
        )
291
        del resource_dict["threads_per_core"]
1✔
292
        del resource_dict["gpus_per_core"]
1✔
293
        del resource_dict["slurm_cmd_args"]
1✔
294
        if block_allocation:
1✔
295
            resource_dict["init_function"] = init_function
1✔
296
            return InteractiveExecutor(
1✔
297
                max_workers=validate_number_of_cores(
298
                    max_cores=max_cores,
299
                    max_workers=max_workers,
300
                    cores_per_worker=cores_per_worker,
301
                    set_local_cores=True,
302
                ),
303
                executor_kwargs=resource_dict,
304
                spawner=MpiExecSpawner,
305
            )
306
        else:
307
            return InteractiveStepExecutor(
1✔
308
                max_cores=max_cores,
309
                max_workers=max_workers,
310
                executor_kwargs=resource_dict,
311
                spawner=MpiExecSpawner,
312
            )
313
    else:
314
        raise ValueError(
1✔
315
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
316
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc