• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13089119181

01 Feb 2025 01:32PM UTC coverage: 96.536% (+0.05%) from 96.485%
13089119181

Pull #551

github

web-flow
Merge bfc66d8be into 565bb32b3
Pull Request #551: [minor] restructure create method

23 of 23 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

1059 of 1097 relevant lines covered (96.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.94
/executorlib/interactive/create.py
1
from typing import Callable, Optional, Union
1✔
2

3
from executorlib.interactive.shared import (
1✔
4
    InteractiveExecutor,
5
    InteractiveStepExecutor,
6
)
7
from executorlib.interactive.slurm import SrunSpawner
1✔
8
from executorlib.interactive.slurm import (
1✔
9
    validate_max_workers as validate_max_workers_slurm,
10
)
11
from executorlib.standalone.inputcheck import (
1✔
12
    check_command_line_argument_lst,
13
    check_executor,
14
    check_flux_log_files,
15
    check_gpus_per_worker,
16
    check_init_function,
17
    check_nested_flux_executor,
18
    check_oversubscribe,
19
    check_pmi,
20
    validate_number_of_cores,
21
)
22
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
23

24
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
25
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
26
    from executorlib.interactive.flux import (
1✔
27
        validate_max_workers as validate_max_workers_flux,
28
    )
29
except ImportError:
×
30
    pass
×
31

32

33
def create_executor(
1✔
34
    max_workers: Optional[int] = None,
35
    backend: str = "local",
36
    max_cores: Optional[int] = None,
37
    cache_directory: Optional[str] = None,
38
    resource_dict: dict = {},
39
    flux_executor=None,
40
    flux_executor_pmi_mode: Optional[str] = None,
41
    flux_executor_nesting: bool = False,
42
    flux_log_files: bool = False,
43
    hostname_localhost: Optional[bool] = None,
44
    block_allocation: bool = False,
45
    init_function: Optional[Callable] = None,
46
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
47
    """
48
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
49
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
50
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
51
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
52
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
53
    requires the SLURM workload manager to be installed on the system.
54

55
    Args:
56
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
57
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
58
                           recommended, as computers have a limited number of compute cores.
59
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
60
        max_cores (int): defines the number cores which can be used in parallel
61
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
62
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
63
                              - cores (int): number of MPI cores to be used for each function call
64
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
65
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
66
                              - cwd (str/None): current working directory where the parallel python task is executed
67
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
68
                                                              SLURM only) - default False
69
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
70
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
71
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
72
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
73
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
74
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
75
                                      context of an HPC cluster this essential to be able to communicate to an Executor
76
                                      running on a different compute node within the same allocation. And in principle
77
                                      any computer should be able to resolve that their own hostname points to the same
78
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
79
                                      reasons. So on MacOS it is required to set this option to true
80
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
81
                                    resource requirements, executorlib supports block allocation. In this case all
82
                                    resources have to be defined on the executor, rather than during the submission
83
                                    of the individual function.
84
        init_function (None): optional function to preset arguments for functions which are submitted later
85
    """
86
    if flux_executor is not None and backend != "flux_allocation":
1✔
UNCOV
87
        backend = "flux_allocation"
×
88
    if backend == "flux_allocation":
1✔
89
        check_init_function(
1✔
90
            block_allocation=block_allocation, init_function=init_function
91
        )
92
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
93
        resource_dict["cache_directory"] = cache_directory
1✔
94
        resource_dict["hostname_localhost"] = hostname_localhost
1✔
95
        check_oversubscribe(
1✔
96
            oversubscribe=resource_dict.get("openmpi_oversubscribe", False)
97
        )
98
        check_command_line_argument_lst(
1✔
99
            command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
100
        )
101
        return create_flux_allocation_executor(
1✔
102
            max_workers=max_workers,
103
            max_cores=max_cores,
104
            cache_directory=cache_directory,
105
            resource_dict=resource_dict,
106
            flux_executor=flux_executor,
107
            flux_executor_pmi_mode=flux_executor_pmi_mode,
108
            flux_executor_nesting=flux_executor_nesting,
109
            flux_log_files=flux_log_files,
110
            hostname_localhost=hostname_localhost,
111
            block_allocation=block_allocation,
112
            init_function=init_function,
113
        )
114
    elif backend == "slurm_allocation":
1✔
115
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
116
        check_executor(executor=flux_executor)
1✔
117
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
118
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
119
        return create_slurm_allocation_executor(
1✔
120
            max_workers=max_workers,
121
            max_cores=max_cores,
122
            cache_directory=cache_directory,
123
            resource_dict=resource_dict,
124
            hostname_localhost=hostname_localhost,
125
            block_allocation=block_allocation,
126
            init_function=init_function,
127
        )
128
    elif backend == "local":
1✔
129
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
130
        check_executor(executor=flux_executor)
1✔
131
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
132
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
133
        return create_local_executor(
1✔
134
            max_workers=max_workers,
135
            max_cores=max_cores,
136
            cache_directory=cache_directory,
137
            resource_dict=resource_dict,
138
            hostname_localhost=hostname_localhost,
139
            block_allocation=block_allocation,
140
            init_function=init_function,
141
        )
142
    else:
143
        raise ValueError(
1✔
144
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
145
        )
146

147

148
def create_flux_allocation_executor(
1✔
149
    max_workers: Optional[int] = None,
150
    max_cores: Optional[int] = None,
151
    cache_directory: Optional[str] = None,
152
    resource_dict: dict = {},
153
    flux_executor=None,
154
    flux_executor_pmi_mode: Optional[str] = None,
155
    flux_executor_nesting: bool = False,
156
    flux_log_files: bool = False,
157
    hostname_localhost: Optional[bool] = None,
158
    block_allocation: bool = False,
159
    init_function: Optional[Callable] = None,
160
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
161
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
162
    check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
1✔
163
    cores_per_worker = resource_dict.get("cores", 1)
1✔
164
    resource_dict["cache_directory"] = cache_directory
1✔
165
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
166
    check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
1✔
167
    check_command_line_argument_lst(
1✔
168
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
169
    )
170
    if "openmpi_oversubscribe" in resource_dict.keys():
1✔
171
        del resource_dict["openmpi_oversubscribe"]
1✔
172
    if "slurm_cmd_args" in resource_dict.keys():
1✔
173
        del resource_dict["slurm_cmd_args"]
1✔
174
    resource_dict["flux_executor"] = flux_executor
1✔
175
    resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
176
    resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
177
    resource_dict["flux_log_files"] = flux_log_files
1✔
178
    if block_allocation:
1✔
179
        resource_dict["init_function"] = init_function
1✔
180
        max_workers = validate_number_of_cores(
1✔
181
            max_cores=max_cores,
182
            max_workers=max_workers,
183
            cores_per_worker=cores_per_worker,
184
            set_local_cores=False,
185
        )
186
        validate_max_workers_flux(
1✔
187
            max_workers=max_workers,
188
            cores=cores_per_worker,
189
            threads_per_core=resource_dict.get("threads_per_core", 1),
190
        )
191
        return InteractiveExecutor(
1✔
192
            max_workers=max_workers,
193
            executor_kwargs=resource_dict,
194
            spawner=FluxPythonSpawner,
195
        )
196
    else:
197
        return InteractiveStepExecutor(
×
198
            max_cores=max_cores,
199
            max_workers=max_workers,
200
            executor_kwargs=resource_dict,
201
            spawner=FluxPythonSpawner,
202
        )
203

204

205
def create_slurm_allocation_executor(
1✔
206
    max_workers: Optional[int] = None,
207
    max_cores: Optional[int] = None,
208
    cache_directory: Optional[str] = None,
209
    resource_dict: dict = {},
210
    hostname_localhost: Optional[bool] = None,
211
    block_allocation: bool = False,
212
    init_function: Optional[Callable] = None,
213
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
214
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
215
    cores_per_worker = resource_dict.get("cores", 1)
1✔
216
    resource_dict["cache_directory"] = cache_directory
1✔
217
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
218
    if block_allocation:
1✔
219
        resource_dict["init_function"] = init_function
1✔
220
        max_workers = validate_number_of_cores(
1✔
221
            max_cores=max_cores,
222
            max_workers=max_workers,
223
            cores_per_worker=cores_per_worker,
224
            set_local_cores=False,
225
        )
226
        validate_max_workers_slurm(
1✔
227
            max_workers=max_workers,
228
            cores=cores_per_worker,
229
            threads_per_core=resource_dict.get("threads_per_core", 1),
230
        )
231
        return InteractiveExecutor(
×
232
            max_workers=max_workers,
233
            executor_kwargs=resource_dict,
234
            spawner=SrunSpawner,
235
        )
236
    else:
237
        return InteractiveStepExecutor(
×
238
            max_cores=max_cores,
239
            max_workers=max_workers,
240
            executor_kwargs=resource_dict,
241
            spawner=SrunSpawner,
242
        )
243

244

245
def create_local_executor(
1✔
246
    max_workers: Optional[int] = None,
247
    max_cores: Optional[int] = None,
248
    cache_directory: Optional[str] = None,
249
    resource_dict: dict = {},
250
    hostname_localhost: Optional[bool] = None,
251
    block_allocation: bool = False,
252
    init_function: Optional[Callable] = None,
253
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
254
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
255
    cores_per_worker = resource_dict.get("cores", 1)
1✔
256
    resource_dict["cache_directory"] = cache_directory
1✔
257
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
258

259
    check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
1✔
260
    check_command_line_argument_lst(
1✔
261
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
262
    )
263
    if "threads_per_core" in resource_dict.keys():
1✔
264
        del resource_dict["threads_per_core"]
1✔
265
    if "gpus_per_core" in resource_dict.keys():
1✔
266
        del resource_dict["gpus_per_core"]
1✔
267
    if "slurm_cmd_args" in resource_dict.keys():
1✔
268
        del resource_dict["slurm_cmd_args"]
1✔
269
    if block_allocation:
1✔
270
        resource_dict["init_function"] = init_function
1✔
271
        return InteractiveExecutor(
1✔
272
            max_workers=validate_number_of_cores(
273
                max_cores=max_cores,
274
                max_workers=max_workers,
275
                cores_per_worker=cores_per_worker,
276
                set_local_cores=True,
277
            ),
278
            executor_kwargs=resource_dict,
279
            spawner=MpiExecSpawner,
280
        )
281
    else:
282
        return InteractiveStepExecutor(
1✔
283
            max_cores=max_cores,
284
            max_workers=max_workers,
285
            executor_kwargs=resource_dict,
286
            spawner=MpiExecSpawner,
287
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc