• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13089084080

01 Feb 2025 01:27PM UTC coverage: 96.539% (+0.05%) from 96.485%
13089084080

Pull #551

github

web-flow
Merge e1f9de94d into 565bb32b3
Pull Request #551: [minor] restructure create method

22 of 22 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

1060 of 1098 relevant lines covered (96.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.02
/executorlib/interactive/create.py
1
from typing import Callable, Optional, Union
1✔
2

3
from executorlib.interactive.shared import (
1✔
4
    InteractiveExecutor,
5
    InteractiveStepExecutor,
6
)
7
from executorlib.interactive.slurm import SrunSpawner
1✔
8
from executorlib.interactive.slurm import (
1✔
9
    validate_max_workers as validate_max_workers_slurm,
10
)
11
from executorlib.standalone.inputcheck import (
1✔
12
    check_command_line_argument_lst,
13
    check_executor,
14
    check_flux_log_files,
15
    check_gpus_per_worker,
16
    check_init_function,
17
    check_nested_flux_executor,
18
    check_oversubscribe,
19
    check_pmi,
20
    validate_number_of_cores,
21
)
22
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
23

24
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
25
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
26
    from executorlib.interactive.flux import (
1✔
27
        validate_max_workers as validate_max_workers_flux,
28
    )
29
except ImportError:
×
30
    pass
×
31

32

33
def create_executor(
1✔
34
    max_workers: Optional[int] = None,
35
    backend: str = "local",
36
    max_cores: Optional[int] = None,
37
    cache_directory: Optional[str] = None,
38
    resource_dict: dict = {},
39
    flux_executor=None,
40
    flux_executor_pmi_mode: Optional[str] = None,
41
    flux_executor_nesting: bool = False,
42
    flux_log_files: bool = False,
43
    hostname_localhost: Optional[bool] = None,
44
    block_allocation: bool = False,
45
    init_function: Optional[Callable] = None,
46
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
47
    """
48
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
49
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
50
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
51
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
52
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
53
    requires the SLURM workload manager to be installed on the system.
54

55
    Args:
56
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
57
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
58
                           recommended, as computers have a limited number of compute cores.
59
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
60
        max_cores (int): defines the number cores which can be used in parallel
61
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
62
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
63
                              - cores (int): number of MPI cores to be used for each function call
64
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
65
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
66
                              - cwd (str/None): current working directory where the parallel python task is executed
67
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
68
                                                              SLURM only) - default False
69
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
70
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
71
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
72
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
73
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
74
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
75
                                      context of an HPC cluster this essential to be able to communicate to an Executor
76
                                      running on a different compute node within the same allocation. And in principle
77
                                      any computer should be able to resolve that their own hostname points to the same
78
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
79
                                      reasons. So on MacOS it is required to set this option to true
80
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
81
                                    resource requirements, executorlib supports block allocation. In this case all
82
                                    resources have to be defined on the executor, rather than during the submission
83
                                    of the individual function.
84
        init_function (None): optional function to preset arguments for functions which are submitted later
85
    """
86
    if flux_executor is not None and backend != "flux_allocation":
1✔
UNCOV
87
        backend = "flux_allocation"
×
88
    if backend == "flux_allocation":
1✔
89
        check_init_function(
1✔
90
            block_allocation=block_allocation, init_function=init_function
91
        )
92
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
93
        cores_per_worker = resource_dict.get("cores", 1)
1✔
94
        resource_dict["cache_directory"] = cache_directory
1✔
95
        resource_dict["hostname_localhost"] = hostname_localhost
1✔
96
        check_oversubscribe(
1✔
97
            oversubscribe=resource_dict.get("openmpi_oversubscribe", False)
98
        )
99
        check_command_line_argument_lst(
1✔
100
            command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
101
        )
102
        return create_flux_allocation_executor(
1✔
103
            max_workers=max_workers,
104
            max_cores=max_cores,
105
            cache_directory=cache_directory,
106
            resource_dict=resource_dict,
107
            flux_executor=flux_executor,
108
            flux_executor_pmi_mode=flux_executor_pmi_mode,
109
            flux_executor_nesting=flux_executor_nesting,
110
            flux_log_files=flux_log_files,
111
            hostname_localhost=hostname_localhost,
112
            block_allocation=block_allocation,
113
            init_function=init_function,
114
        )
115
    elif backend == "slurm_allocation":
1✔
116
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
117
        check_executor(executor=flux_executor)
1✔
118
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
119
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
120
        return create_slurm_allocation_executor(
1✔
121
            max_workers=max_workers,
122
            max_cores=max_cores,
123
            cache_directory=cache_directory,
124
            resource_dict=resource_dict,
125
            hostname_localhost=hostname_localhost,
126
            block_allocation=block_allocation,
127
            init_function=init_function,
128
        )
129
    elif backend == "local":
1✔
130
        check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
131
        check_executor(executor=flux_executor)
1✔
132
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
133
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
134
        return create_local_executor(
1✔
135
            max_workers=max_workers,
136
            max_cores=max_cores,
137
            cache_directory=cache_directory,
138
            resource_dict=resource_dict,
139
            hostname_localhost=hostname_localhost,
140
            block_allocation=block_allocation,
141
            init_function=init_function,
142
        )
143
    else:
144
        raise ValueError(
1✔
145
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
146
        )
147

148

149
def create_flux_allocation_executor(
1✔
150
    max_workers: Optional[int] = None,
151
    max_cores: Optional[int] = None,
152
    cache_directory: Optional[str] = None,
153
    resource_dict: dict = {},
154
    flux_executor=None,
155
    flux_executor_pmi_mode: Optional[str] = None,
156
    flux_executor_nesting: bool = False,
157
    flux_log_files: bool = False,
158
    hostname_localhost: Optional[bool] = None,
159
    block_allocation: bool = False,
160
    init_function: Optional[Callable] = None,
161
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
162
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
163
    check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
1✔
164
    cores_per_worker = resource_dict.get("cores", 1)
1✔
165
    resource_dict["cache_directory"] = cache_directory
1✔
166
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
167
    check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
1✔
168
    check_command_line_argument_lst(
1✔
169
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
170
    )
171
    if "openmpi_oversubscribe" in resource_dict.keys():
1✔
172
        del resource_dict["openmpi_oversubscribe"]
1✔
173
    if "slurm_cmd_args" in resource_dict.keys():
1✔
174
        del resource_dict["slurm_cmd_args"]
1✔
175
    resource_dict["flux_executor"] = flux_executor
1✔
176
    resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
177
    resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
178
    resource_dict["flux_log_files"] = flux_log_files
1✔
179
    if block_allocation:
1✔
180
        resource_dict["init_function"] = init_function
1✔
181
        max_workers = validate_number_of_cores(
1✔
182
            max_cores=max_cores,
183
            max_workers=max_workers,
184
            cores_per_worker=cores_per_worker,
185
            set_local_cores=False,
186
        )
187
        validate_max_workers_flux(
1✔
188
            max_workers=max_workers,
189
            cores=cores_per_worker,
190
            threads_per_core=resource_dict.get("threads_per_core", 1),
191
        )
192
        return InteractiveExecutor(
1✔
193
            max_workers=max_workers,
194
            executor_kwargs=resource_dict,
195
            spawner=FluxPythonSpawner,
196
        )
197
    else:
198
        return InteractiveStepExecutor(
×
199
            max_cores=max_cores,
200
            max_workers=max_workers,
201
            executor_kwargs=resource_dict,
202
            spawner=FluxPythonSpawner,
203
        )
204

205

206
def create_slurm_allocation_executor(
1✔
207
    max_workers: Optional[int] = None,
208
    max_cores: Optional[int] = None,
209
    cache_directory: Optional[str] = None,
210
    resource_dict: dict = {},
211
    hostname_localhost: Optional[bool] = None,
212
    block_allocation: bool = False,
213
    init_function: Optional[Callable] = None,
214
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
215
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
216
    cores_per_worker = resource_dict.get("cores", 1)
1✔
217
    resource_dict["cache_directory"] = cache_directory
1✔
218
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
219
    if block_allocation:
1✔
220
        resource_dict["init_function"] = init_function
1✔
221
        max_workers = validate_number_of_cores(
1✔
222
            max_cores=max_cores,
223
            max_workers=max_workers,
224
            cores_per_worker=cores_per_worker,
225
            set_local_cores=False,
226
        )
227
        validate_max_workers_slurm(
1✔
228
            max_workers=max_workers,
229
            cores=cores_per_worker,
230
            threads_per_core=resource_dict.get("threads_per_core", 1),
231
        )
232
        return InteractiveExecutor(
×
233
            max_workers=max_workers,
234
            executor_kwargs=resource_dict,
235
            spawner=SrunSpawner,
236
        )
237
    else:
238
        return InteractiveStepExecutor(
×
239
            max_cores=max_cores,
240
            max_workers=max_workers,
241
            executor_kwargs=resource_dict,
242
            spawner=SrunSpawner,
243
        )
244

245

246
def create_local_executor(
1✔
247
    max_workers: Optional[int] = None,
248
    max_cores: Optional[int] = None,
249
    cache_directory: Optional[str] = None,
250
    resource_dict: dict = {},
251
    hostname_localhost: Optional[bool] = None,
252
    block_allocation: bool = False,
253
    init_function: Optional[Callable] = None,
254
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
255
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
256
    cores_per_worker = resource_dict.get("cores", 1)
1✔
257
    resource_dict["cache_directory"] = cache_directory
1✔
258
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
259

260
    check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
1✔
261
    check_command_line_argument_lst(
1✔
262
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
263
    )
264
    if "threads_per_core" in resource_dict.keys():
1✔
265
        del resource_dict["threads_per_core"]
1✔
266
    if "gpus_per_core" in resource_dict.keys():
1✔
267
        del resource_dict["gpus_per_core"]
1✔
268
    if "slurm_cmd_args" in resource_dict.keys():
1✔
269
        del resource_dict["slurm_cmd_args"]
1✔
270
    if block_allocation:
1✔
271
        resource_dict["init_function"] = init_function
1✔
272
        return InteractiveExecutor(
1✔
273
            max_workers=validate_number_of_cores(
274
                max_cores=max_cores,
275
                max_workers=max_workers,
276
                cores_per_worker=cores_per_worker,
277
                set_local_cores=True,
278
            ),
279
            executor_kwargs=resource_dict,
280
            spawner=MpiExecSpawner,
281
        )
282
    else:
283
        return InteractiveStepExecutor(
1✔
284
            max_cores=max_cores,
285
            max_workers=max_workers,
286
            executor_kwargs=resource_dict,
287
            spawner=MpiExecSpawner,
288
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc