• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13119627889

03 Feb 2025 05:51PM UTC coverage: 95.996% (-0.5%) from 96.536%
13119627889

Pull #555

github

web-flow
Merge 05f160496 into 5ec2a2015
Pull Request #555: Add linting

75 of 85 new or added lines in 16 files covered. (88.24%)

1 existing line in 1 file now uncovered.

1079 of 1124 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.25
/executorlib/interactive/create.py
1
from typing import Callable, Optional, Union
1✔
2

3
from executorlib.interactive.shared import (
1✔
4
    InteractiveExecutor,
5
    InteractiveStepExecutor,
6
)
7
from executorlib.interactive.slurm import SrunSpawner
1✔
8
from executorlib.interactive.slurm import (
1✔
9
    validate_max_workers as validate_max_workers_slurm,
10
)
11
from executorlib.standalone.inputcheck import (
1✔
12
    check_command_line_argument_lst,
13
    check_executor,
14
    check_flux_log_files,
15
    check_gpus_per_worker,
16
    check_init_function,
17
    check_nested_flux_executor,
18
    check_oversubscribe,
19
    check_pmi,
20
    validate_number_of_cores,
21
)
22
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
23

24
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
25
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
26
    from executorlib.interactive.flux import (
1✔
27
        validate_max_workers as validate_max_workers_flux,
28
    )
29
except ImportError:
×
30
    pass
×
31

32

33
def create_executor(
1✔
34
    max_workers: Optional[int] = None,
35
    backend: str = "local",
36
    max_cores: Optional[int] = None,
37
    cache_directory: Optional[str] = None,
38
    resource_dict: Optional[dict] = None,
39
    flux_executor=None,
40
    flux_executor_pmi_mode: Optional[str] = None,
41
    flux_executor_nesting: bool = False,
42
    flux_log_files: bool = False,
43
    hostname_localhost: Optional[bool] = None,
44
    block_allocation: bool = False,
45
    init_function: Optional[Callable] = None,
46
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
47
    """
48
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
49
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
50
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
51
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
52
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
53
    requires the SLURM workload manager to be installed on the system.
54

55
    Args:
56
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
57
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
58
                           recommended, as computers have a limited number of compute cores.
59
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
60
        max_cores (int): defines the number cores which can be used in parallel
61
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
62
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
63
                              - cores (int): number of MPI cores to be used for each function call
64
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
65
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
66
                              - cwd (str/None): current working directory where the parallel python task is executed
67
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
68
                                                              SLURM only) - default False
69
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
70
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
71
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
72
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
73
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
74
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
75
                                      context of an HPC cluster this essential to be able to communicate to an Executor
76
                                      running on a different compute node within the same allocation. And in principle
77
                                      any computer should be able to resolve that their own hostname points to the same
78
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
79
                                      reasons. So on MacOS it is required to set this option to true
80
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
81
                                    resource requirements, executorlib supports block allocation. In this case all
82
                                    resources have to be defined on the executor, rather than during the submission
83
                                    of the individual function.
84
        init_function (None): optional function to preset arguments for functions which are submitted later
85
    """
86
    if resource_dict is None:
1✔
NEW
87
        resource_dict = {}
×
88
    if flux_executor is not None and backend != "flux_allocation":
1✔
89
        backend = "flux_allocation"
×
90
    if backend == "flux_allocation":
1✔
91
        check_init_function(
1✔
92
            block_allocation=block_allocation, init_function=init_function
93
        )
94
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
95
        resource_dict["cache_directory"] = cache_directory
1✔
96
        resource_dict["hostname_localhost"] = hostname_localhost
1✔
97
        check_oversubscribe(
1✔
98
            oversubscribe=resource_dict.get("openmpi_oversubscribe", False)
99
        )
100
        check_command_line_argument_lst(
1✔
101
            command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
102
        )
103
        return create_flux_allocation_executor(
1✔
104
            max_workers=max_workers,
105
            max_cores=max_cores,
106
            cache_directory=cache_directory,
107
            resource_dict=resource_dict,
108
            flux_executor=flux_executor,
109
            flux_executor_pmi_mode=flux_executor_pmi_mode,
110
            flux_executor_nesting=flux_executor_nesting,
111
            flux_log_files=flux_log_files,
112
            hostname_localhost=hostname_localhost,
113
            block_allocation=block_allocation,
114
            init_function=init_function,
115
        )
116
    elif backend == "slurm_allocation":
1✔
117
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
118
        check_executor(executor=flux_executor)
1✔
119
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
120
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
121
        return create_slurm_allocation_executor(
1✔
122
            max_workers=max_workers,
123
            max_cores=max_cores,
124
            cache_directory=cache_directory,
125
            resource_dict=resource_dict,
126
            hostname_localhost=hostname_localhost,
127
            block_allocation=block_allocation,
128
            init_function=init_function,
129
        )
130
    elif backend == "local":
1✔
131
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
132
        check_executor(executor=flux_executor)
1✔
133
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
134
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
135
        return create_local_executor(
1✔
136
            max_workers=max_workers,
137
            max_cores=max_cores,
138
            cache_directory=cache_directory,
139
            resource_dict=resource_dict,
140
            hostname_localhost=hostname_localhost,
141
            block_allocation=block_allocation,
142
            init_function=init_function,
143
        )
144
    else:
145
        raise ValueError(
1✔
146
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
147
        )
148

149

150
def create_flux_allocation_executor(
1✔
151
    max_workers: Optional[int] = None,
152
    max_cores: Optional[int] = None,
153
    cache_directory: Optional[str] = None,
154
    resource_dict: Optional[dict] = None,
155
    flux_executor=None,
156
    flux_executor_pmi_mode: Optional[str] = None,
157
    flux_executor_nesting: bool = False,
158
    flux_log_files: bool = False,
159
    hostname_localhost: Optional[bool] = None,
160
    block_allocation: bool = False,
161
    init_function: Optional[Callable] = None,
162
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
163
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
164
    check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
1✔
165
    if resource_dict is None:
1✔
NEW
166
        resource_dict = {}
×
167
    cores_per_worker = resource_dict.get("cores", 1)
1✔
168
    resource_dict["cache_directory"] = cache_directory
1✔
169
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
170
    check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
1✔
171
    check_command_line_argument_lst(
1✔
172
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
173
    )
174
    if "openmpi_oversubscribe" in resource_dict:
1✔
175
        del resource_dict["openmpi_oversubscribe"]
1✔
176
    if "slurm_cmd_args" in resource_dict:
1✔
177
        del resource_dict["slurm_cmd_args"]
1✔
178
    resource_dict["flux_executor"] = flux_executor
1✔
179
    resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
180
    resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
181
    resource_dict["flux_log_files"] = flux_log_files
1✔
182
    if block_allocation:
1✔
183
        resource_dict["init_function"] = init_function
1✔
184
        max_workers = validate_number_of_cores(
1✔
185
            max_cores=max_cores,
186
            max_workers=max_workers,
187
            cores_per_worker=cores_per_worker,
188
            set_local_cores=False,
189
        )
190
        validate_max_workers_flux(
1✔
191
            max_workers=max_workers,
192
            cores=cores_per_worker,
193
            threads_per_core=resource_dict.get("threads_per_core", 1),
194
        )
195
        return InteractiveExecutor(
1✔
196
            max_workers=max_workers,
197
            executor_kwargs=resource_dict,
198
            spawner=FluxPythonSpawner,
199
        )
200
    else:
201
        return InteractiveStepExecutor(
×
202
            max_cores=max_cores,
203
            max_workers=max_workers,
204
            executor_kwargs=resource_dict,
205
            spawner=FluxPythonSpawner,
206
        )
207

208

209
def create_slurm_allocation_executor(
1✔
210
    max_workers: Optional[int] = None,
211
    max_cores: Optional[int] = None,
212
    cache_directory: Optional[str] = None,
213
    resource_dict: Optional[dict] = None,
214
    hostname_localhost: Optional[bool] = None,
215
    block_allocation: bool = False,
216
    init_function: Optional[Callable] = None,
217
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
218
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
219
    if resource_dict is None:
1✔
NEW
220
        resource_dict = {}
×
221
    cores_per_worker = resource_dict.get("cores", 1)
1✔
222
    resource_dict["cache_directory"] = cache_directory
1✔
223
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
224
    if block_allocation:
1✔
225
        resource_dict["init_function"] = init_function
1✔
226
        max_workers = validate_number_of_cores(
1✔
227
            max_cores=max_cores,
228
            max_workers=max_workers,
229
            cores_per_worker=cores_per_worker,
230
            set_local_cores=False,
231
        )
232
        validate_max_workers_slurm(
1✔
233
            max_workers=max_workers,
234
            cores=cores_per_worker,
235
            threads_per_core=resource_dict.get("threads_per_core", 1),
236
        )
237
        return InteractiveExecutor(
×
238
            max_workers=max_workers,
239
            executor_kwargs=resource_dict,
240
            spawner=SrunSpawner,
241
        )
242
    else:
243
        return InteractiveStepExecutor(
×
244
            max_cores=max_cores,
245
            max_workers=max_workers,
246
            executor_kwargs=resource_dict,
247
            spawner=SrunSpawner,
248
        )
249

250

251
def create_local_executor(
1✔
252
    max_workers: Optional[int] = None,
253
    max_cores: Optional[int] = None,
254
    cache_directory: Optional[str] = None,
255
    resource_dict: Optional[dict] = None,
256
    hostname_localhost: Optional[bool] = None,
257
    block_allocation: bool = False,
258
    init_function: Optional[Callable] = None,
259
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
260
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
261
    if resource_dict is None:
1✔
NEW
262
        resource_dict = {}
×
263
    cores_per_worker = resource_dict.get("cores", 1)
1✔
264
    resource_dict["cache_directory"] = cache_directory
1✔
265
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
266

267
    check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
1✔
268
    check_command_line_argument_lst(
1✔
269
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
270
    )
271
    if "threads_per_core" in resource_dict:
1✔
272
        del resource_dict["threads_per_core"]
1✔
273
    if "gpus_per_core" in resource_dict:
1✔
274
        del resource_dict["gpus_per_core"]
1✔
275
    if "slurm_cmd_args" in resource_dict:
1✔
276
        del resource_dict["slurm_cmd_args"]
1✔
277
    if block_allocation:
1✔
278
        resource_dict["init_function"] = init_function
1✔
279
        return InteractiveExecutor(
1✔
280
            max_workers=validate_number_of_cores(
281
                max_cores=max_cores,
282
                max_workers=max_workers,
283
                cores_per_worker=cores_per_worker,
284
                set_local_cores=True,
285
            ),
286
            executor_kwargs=resource_dict,
287
            spawner=MpiExecSpawner,
288
        )
289
    else:
290
        return InteractiveStepExecutor(
1✔
291
            max_cores=max_cores,
292
            max_workers=max_workers,
293
            executor_kwargs=resource_dict,
294
            spawner=MpiExecSpawner,
295
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc