• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13257409288

11 Feb 2025 06:51AM UTC coverage: 95.592% (-0.1%) from 95.735%
13257409288

Pull #565

github

web-flow
Merge ebd267ae8 into 659d0ded2
Pull Request #565: [feature] Add option to specify number of nodes

8 of 8 new or added lines in 2 files covered. (100.0%)

12 existing lines in 4 files now uncovered.

1106 of 1157 relevant lines covered (95.59%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.38
/executorlib/interfaces/flux.py
1
from typing import Callable, Optional, Union
1✔
2

3
from executorlib.interactive.executor import ExecutorWithDependencies
1✔
4
from executorlib.interactive.shared import (
1✔
5
    InteractiveExecutor,
6
    InteractiveStepExecutor,
7
)
8
from executorlib.standalone.inputcheck import (
1✔
9
    check_command_line_argument_lst,
10
    check_init_function,
11
    check_oversubscribe,
12
    check_plot_dependency_graph,
13
    check_pmi,
14
    check_refresh_rate,
15
    validate_number_of_cores,
16
)
17

18
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
19
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
20
    from executorlib.interactive.flux import (
1✔
21
        validate_max_workers as validate_max_workers_flux,
22
    )
23
except ImportError:
×
24
    pass
×
25

26

27
class FluxJobExecutor:
1✔
28
    """
29
    The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
30
    preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
31
    the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not
32
    require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly
33
    in an interactive Jupyter notebook.
34

35
    Args:
36
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
37
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
38
                           recommended, as computers have a limited number of compute cores.
39
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
40
        max_cores (int): defines the number cores which can be used in parallel
41
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
42
                              - cores (int): number of MPI cores to be used for each function call
43
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
44
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
45
                              - cwd (str/None): current working directory where the parallel python task is executed
46
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
47
                                                              SLURM only) - default False
48
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
49
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
50
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
51
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
52
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
53
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
54
                                      context of an HPC cluster this essential to be able to communicate to an
55
                                      Executor running on a different compute node within the same allocation. And
56
                                      in principle any computer should be able to resolve that their own hostname
57
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
58
                                      this look up for security reasons. So on MacOS it is required to set this
59
                                      option to true
60
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
61
                                    requirements, executorlib supports block allocation. In this case all resources have
62
                                    to be defined on the executor, rather than during the submission of the individual
63
                                    function.
64
        init_function (None): optional function to preset arguments for functions which are submitted later
65
        disable_dependencies (boolean): Disable resolving future objects during the submission.
66
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
67
        plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
68
                                      debugging purposes and to get an overview of the specified dependencies.
69
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
70

71
    Examples:
72
        ```
73
        >>> import numpy as np
74
        >>> from executorlib.interfaces.flux import FluxJobExecutor
75
        >>>
76
        >>> def calc(i, j, k):
77
        >>>     from mpi4py import MPI
78
        >>>     size = MPI.COMM_WORLD.Get_size()
79
        >>>     rank = MPI.COMM_WORLD.Get_rank()
80
        >>>     return np.array([i, j, k]), size, rank
81
        >>>
82
        >>> def init_k():
83
        >>>     return {"k": 3}
84
        >>>
85
        >>> with FluxJobExecutor(max_workers=2, init_function=init_k) as p:
86
        >>>     fs = p.submit(calc, 2, j=4)
87
        >>>     print(fs.result())
88
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
89
        ```
90
    """
91

92
    def __init__(
1✔
93
        self,
94
        max_workers: Optional[int] = None,
95
        cache_directory: Optional[str] = None,
96
        max_cores: Optional[int] = None,
97
        resource_dict: Optional[dict] = None,
98
        flux_executor=None,
99
        flux_executor_pmi_mode: Optional[str] = None,
100
        flux_executor_nesting: bool = False,
101
        flux_log_files: bool = False,
102
        hostname_localhost: Optional[bool] = None,
103
        block_allocation: bool = False,
104
        init_function: Optional[Callable] = None,
105
        disable_dependencies: bool = False,
106
        refresh_rate: float = 0.01,
107
        plot_dependency_graph: bool = False,
108
        plot_dependency_graph_filename: Optional[str] = None,
109
    ):
110
        # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
UNCOV
111
        pass
×
112

113
    def __new__(
1✔
114
        cls,
115
        max_workers: Optional[int] = None,
116
        cache_directory: Optional[str] = None,
117
        max_cores: Optional[int] = None,
118
        resource_dict: Optional[dict] = None,
119
        flux_executor=None,
120
        flux_executor_pmi_mode: Optional[str] = None,
121
        flux_executor_nesting: bool = False,
122
        flux_log_files: bool = False,
123
        hostname_localhost: Optional[bool] = None,
124
        block_allocation: bool = False,
125
        init_function: Optional[Callable] = None,
126
        disable_dependencies: bool = False,
127
        refresh_rate: float = 0.01,
128
        plot_dependency_graph: bool = False,
129
        plot_dependency_graph_filename: Optional[str] = None,
130
    ):
131
        """
132
        Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
133
        executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
134
        executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
135
        for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be
136
        installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
137
        requires the SLURM workload manager to be installed on the system.
138

139
        Args:
140
            max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
141
                               number of cores which can be used in parallel - just like the max_cores parameter. Using
142
                               max_cores is recommended, as computers have a limited number of compute cores.
143
            cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
144
            max_cores (int): defines the number cores which can be used in parallel
145
            resource_dict (dict): A dictionary of resources required by the task. With the following keys:
146
                                  - cores (int): number of MPI cores to be used for each function call
147
                                  - threads_per_core (int): number of OpenMP threads to be used for each function call
148
                                  - gpus_per_core (int): number of GPUs per worker - defaults to 0
149
                                  - cwd (str/None): current working directory where the parallel python task is executed
150
                                  - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
151
                                                                  and SLURM only) - default False
152
                                  - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
153
                                                           only)
154
            flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
155
            flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
156
            flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
157
            flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
158
            hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
159
                                      context of an HPC cluster this essential to be able to communicate to an
160
                                      Executor running on a different compute node within the same allocation. And
161
                                      in principle any computer should be able to resolve that their own hostname
162
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
163
                                      this look up for security reasons. So on MacOS it is required to set this
164
                                      option to true
165
            block_allocation (boolean): To accelerate the submission of a series of python functions with the same
166
                                        resource requirements, executorlib supports block allocation. In this case all
167
                                        resources have to be defined on the executor, rather than during the submission
168
                                        of the individual function.
169
            init_function (None): optional function to preset arguments for functions which are submitted later
170
            disable_dependencies (boolean): Disable resolving future objects during the submission.
171
            refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
172
            plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
173
                                          debugging purposes and to get an overview of the specified dependencies.
174
            plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
175

176
        """
177
        default_resource_dict: dict = {
1✔
178
            "cores": 1,
179
            "threads_per_core": 1,
180
            "gpus_per_core": 0,
181
            "cwd": None,
182
            "openmpi_oversubscribe": False,
183
            "slurm_cmd_args": [],
184
        }
185
        if resource_dict is None:
1✔
186
            resource_dict = {}
1✔
187
        resource_dict.update(
1✔
188
            {k: v for k, v in default_resource_dict.items() if k not in resource_dict}
189
        )
190
        if not disable_dependencies:
1✔
191
            return ExecutorWithDependencies(
1✔
192
                executor=create_flux_executor(
193
                    max_workers=max_workers,
194
                    cache_directory=cache_directory,
195
                    max_cores=max_cores,
196
                    resource_dict=resource_dict,
197
                    flux_executor=flux_executor,
198
                    flux_executor_pmi_mode=flux_executor_pmi_mode,
199
                    flux_executor_nesting=flux_executor_nesting,
200
                    flux_log_files=flux_log_files,
201
                    hostname_localhost=hostname_localhost,
202
                    block_allocation=block_allocation,
203
                    init_function=init_function,
204
                ),
205
                max_cores=max_cores,
206
                refresh_rate=refresh_rate,
207
                plot_dependency_graph=plot_dependency_graph,
208
                plot_dependency_graph_filename=plot_dependency_graph_filename,
209
            )
210
        else:
211
            check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
1✔
212
            check_refresh_rate(refresh_rate=refresh_rate)
1✔
213
            return create_flux_executor(
1✔
214
                max_workers=max_workers,
215
                cache_directory=cache_directory,
216
                max_cores=max_cores,
217
                resource_dict=resource_dict,
218
                flux_executor=flux_executor,
219
                flux_executor_pmi_mode=flux_executor_pmi_mode,
220
                flux_executor_nesting=flux_executor_nesting,
221
                flux_log_files=flux_log_files,
222
                hostname_localhost=hostname_localhost,
223
                block_allocation=block_allocation,
224
                init_function=init_function,
225
            )
226

227

228
class FluxClusterExecutor:
1✔
229
    """
230
    The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
231
    preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
232
    the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not
233
    require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly
234
    in an interactive Jupyter notebook.
235

236
    Args:
237
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
238
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
239
                           recommended, as computers have a limited number of compute cores.
240
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
241
        max_cores (int): defines the number cores which can be used in parallel
242
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
243
                              - cores (int): number of MPI cores to be used for each function call
244
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
245
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
246
                              - cwd (str/None): current working directory where the parallel python task is executed
247
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
248
                                                              SLURM only) - default False
249
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
250
        pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
251
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
252
                                      context of an HPC cluster this essential to be able to communicate to an
253
                                      Executor running on a different compute node within the same allocation. And
254
                                      in principle any computer should be able to resolve that their own hostname
255
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
256
                                      this look up for security reasons. So on MacOS it is required to set this
257
                                      option to true
258
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
259
                                    requirements, executorlib supports block allocation. In this case all resources have
260
                                    to be defined on the executor, rather than during the submission of the individual
261
                                    function.
262
        init_function (None): optional function to preset arguments for functions which are submitted later
263
        disable_dependencies (boolean): Disable resolving future objects during the submission.
264
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
265
        plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
266
                                      debugging purposes and to get an overview of the specified dependencies.
267
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
268

269
    Examples:
270
        ```
271
        >>> import numpy as np
272
        >>> from executorlib.interfaces.flux import FluxClusterExecutor
273
        >>>
274
        >>> def calc(i, j, k):
275
        >>>     from mpi4py import MPI
276
        >>>     size = MPI.COMM_WORLD.Get_size()
277
        >>>     rank = MPI.COMM_WORLD.Get_rank()
278
        >>>     return np.array([i, j, k]), size, rank
279
        >>>
280
        >>> def init_k():
281
        >>>     return {"k": 3}
282
        >>>
283
        >>> with FluxClusterExecutor(max_workers=2, init_function=init_k) as p:
284
        >>>     fs = p.submit(calc, 2, j=4)
285
        >>>     print(fs.result())
286
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
287
        ```
288
    """
289

290
    def __init__(
1✔
291
        self,
292
        max_workers: Optional[int] = None,
293
        cache_directory: Optional[str] = None,
294
        max_cores: Optional[int] = None,
295
        resource_dict: Optional[dict] = None,
296
        pysqa_config_directory: Optional[str] = None,
297
        hostname_localhost: Optional[bool] = None,
298
        block_allocation: bool = False,
299
        init_function: Optional[Callable] = None,
300
        disable_dependencies: bool = False,
301
        refresh_rate: float = 0.01,
302
        plot_dependency_graph: bool = False,
303
        plot_dependency_graph_filename: Optional[str] = None,
304
    ):
305
        # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
UNCOV
306
        pass
×
307

308
    def __new__(
1✔
309
        cls,
310
        max_workers: Optional[int] = None,
311
        cache_directory: Optional[str] = None,
312
        max_cores: Optional[int] = None,
313
        resource_dict: Optional[dict] = None,
314
        pysqa_config_directory: Optional[str] = None,
315
        hostname_localhost: Optional[bool] = None,
316
        block_allocation: bool = False,
317
        init_function: Optional[Callable] = None,
318
        disable_dependencies: bool = False,
319
        refresh_rate: float = 0.01,
320
        plot_dependency_graph: bool = False,
321
        plot_dependency_graph_filename: Optional[str] = None,
322
    ):
323
        """
324
        Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
325
        executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
326
        executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
327
        for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be
328
        installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
329
        requires the SLURM workload manager to be installed on the system.
330

331
        Args:
332
            max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
333
                               number of cores which can be used in parallel - just like the max_cores parameter. Using
334
                               max_cores is recommended, as computers have a limited number of compute cores.
335
            cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
336
            max_cores (int): defines the number cores which can be used in parallel
337
            resource_dict (dict): A dictionary of resources required by the task. With the following keys:
338
                                  - cores (int): number of MPI cores to be used for each function call
339
                                  - threads_per_core (int): number of OpenMP threads to be used for each function call
340
                                  - gpus_per_core (int): number of GPUs per worker - defaults to 0
341
                                  - cwd (str/None): current working directory where the parallel python task is executed
342
                                  - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
343
                                                                  and SLURM only) - default False
344
                                  - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
345
                                                           only)
346
            pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
347
            hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
348
                                      context of an HPC cluster this essential to be able to communicate to an
349
                                      Executor running on a different compute node within the same allocation. And
350
                                      in principle any computer should be able to resolve that their own hostname
351
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
352
                                      this look up for security reasons. So on MacOS it is required to set this
353
                                      option to true
354
            block_allocation (boolean): To accelerate the submission of a series of python functions with the same
355
                                        resource requirements, executorlib supports block allocation. In this case all
356
                                        resources have to be defined on the executor, rather than during the submission
357
                                        of the individual function.
358
            init_function (None): optional function to preset arguments for functions which are submitted later
359
            disable_dependencies (boolean): Disable resolving future objects during the submission.
360
            refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
361
            plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
362
                                          debugging purposes and to get an overview of the specified dependencies.
363
            plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
364

365
        """
366
        default_resource_dict: dict = {
1✔
367
            "cores": 1,
368
            "threads_per_core": 1,
369
            "gpus_per_core": 0,
370
            "cwd": None,
371
            "openmpi_oversubscribe": False,
372
            "slurm_cmd_args": [],
373
        }
374
        if resource_dict is None:
1✔
375
            resource_dict = {}
1✔
376
        resource_dict.update(
1✔
377
            {k: v for k, v in default_resource_dict.items() if k not in resource_dict}
378
        )
379
        if not plot_dependency_graph:
1✔
380
            from executorlib.cache.executor import create_file_executor
1✔
381

382
            return create_file_executor(
1✔
383
                max_workers=max_workers,
384
                backend="flux_submission",
385
                max_cores=max_cores,
386
                cache_directory=cache_directory,
387
                resource_dict=resource_dict,
388
                flux_executor=None,
389
                flux_executor_pmi_mode=None,
390
                flux_executor_nesting=False,
391
                flux_log_files=False,
392
                pysqa_config_directory=pysqa_config_directory,
393
                hostname_localhost=hostname_localhost,
394
                block_allocation=block_allocation,
395
                init_function=init_function,
396
                disable_dependencies=disable_dependencies,
397
            )
398
        else:
399
            return ExecutorWithDependencies(
1✔
400
                executor=create_flux_executor(
401
                    max_workers=max_workers,
402
                    cache_directory=cache_directory,
403
                    max_cores=max_cores,
404
                    resource_dict=resource_dict,
405
                    flux_executor=None,
406
                    flux_executor_pmi_mode=None,
407
                    flux_executor_nesting=False,
408
                    flux_log_files=False,
409
                    hostname_localhost=hostname_localhost,
410
                    block_allocation=block_allocation,
411
                    init_function=init_function,
412
                ),
413
                max_cores=max_cores,
414
                refresh_rate=refresh_rate,
415
                plot_dependency_graph=plot_dependency_graph,
416
                plot_dependency_graph_filename=plot_dependency_graph_filename,
417
            )
418

419

420
def create_flux_executor(
1✔
421
    max_workers: Optional[int] = None,
422
    max_cores: Optional[int] = None,
423
    cache_directory: Optional[str] = None,
424
    resource_dict: Optional[dict] = None,
425
    flux_executor=None,
426
    flux_executor_pmi_mode: Optional[str] = None,
427
    flux_executor_nesting: bool = False,
428
    flux_log_files: bool = False,
429
    hostname_localhost: Optional[bool] = None,
430
    block_allocation: bool = False,
431
    init_function: Optional[Callable] = None,
432
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
433
    """
434
    Create a flux executor
435

436
    Args:
437
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
438
                           number of cores which can be used in parallel - just like the max_cores parameter. Using
439
                           max_cores is recommended, as computers have a limited number of compute cores.
440
        max_cores (int): defines the number cores which can be used in parallel
441
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
442
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
443
                              - cores (int): number of MPI cores to be used for each function call
444
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
445
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
446
                              - cwd (str/None): current working directory where the parallel python task is executed
447
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
448
                                                              and SLURM only) - default False
449
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
450
                                                       only)
451
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
452
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
453
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
454
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
455
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
456
                                  context of an HPC cluster this essential to be able to communicate to an
457
                                  Executor running on a different compute node within the same allocation. And
458
                                  in principle any computer should be able to resolve that their own hostname
459
                                  points to the same address as localhost. Still MacOS >= 12 seems to disable
460
                                  this look up for security reasons. So on MacOS it is required to set this
461
                                  option to true
462
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
463
                                    resource requirements, executorlib supports block allocation. In this case all
464
                                    resources have to be defined on the executor, rather than during the submission
465
                                    of the individual function.
466
        init_function (None): optional function to preset arguments for functions which are submitted later
467

468
    Returns:
469
        InteractiveStepExecutor/ InteractiveExecutor
470
    """
471
    if resource_dict is None:
1✔
UNCOV
472
        resource_dict = {}
×
473
    cores_per_worker = resource_dict.get("cores", 1)
1✔
474
    resource_dict["cache_directory"] = cache_directory
1✔
475
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
476
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
477
    check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
1✔
478
    check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
1✔
479
    check_command_line_argument_lst(
1✔
480
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
481
    )
482
    if "openmpi_oversubscribe" in resource_dict:
1✔
483
        del resource_dict["openmpi_oversubscribe"]
1✔
484
    if "slurm_cmd_args" in resource_dict:
1✔
485
        del resource_dict["slurm_cmd_args"]
1✔
486
    resource_dict["flux_executor"] = flux_executor
1✔
487
    resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
488
    resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
489
    resource_dict["flux_log_files"] = flux_log_files
1✔
490
    if block_allocation:
1✔
491
        resource_dict["init_function"] = init_function
1✔
492
        max_workers = validate_number_of_cores(
1✔
493
            max_cores=max_cores,
494
            max_workers=max_workers,
495
            cores_per_worker=cores_per_worker,
496
            set_local_cores=False,
497
        )
498
        validate_max_workers_flux(
1✔
499
            max_workers=max_workers,
500
            cores=cores_per_worker,
501
            threads_per_core=resource_dict.get("threads_per_core", 1),
502
        )
503
        return InteractiveExecutor(
1✔
504
            max_workers=max_workers,
505
            executor_kwargs=resource_dict,
506
            spawner=FluxPythonSpawner,
507
        )
508
    else:
509
        return InteractiveStepExecutor(
1✔
510
            max_cores=max_cores,
511
            max_workers=max_workers,
512
            executor_kwargs=resource_dict,
513
            spawner=FluxPythonSpawner,
514
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc