• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12426443480

20 Dec 2024 06:04AM UTC coverage: 95.315% (-0.5%) from 95.849%
12426443480

Pull #529

github

web-flow
Merge e0108425f into f16e67a32
Pull Request #529: Add validate_max_workers_slurm method

6 of 12 new or added lines in 2 files covered. (50.0%)

1 existing line in 1 file now uncovered.

997 of 1046 relevant lines covered (95.32%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.23
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.interactive.slurm import SrunSpawner
1✔
11
from executorlib.interactive.slurm import (
1✔
12
    validate_max_workers as validate_max_workers_slurm,
13
)
14
from executorlib.standalone.inputcheck import (
1✔
15
    check_command_line_argument_lst,
16
    check_executor,
17
    check_flux_log_files,
18
    check_gpus_per_worker,
19
    check_init_function,
20
    check_nested_flux_executor,
21
    check_oversubscribe,
22
    check_pmi,
23
    validate_number_of_cores,
24
)
25
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
26
from executorlib.standalone.plot import (
1✔
27
    draw,
28
    generate_nodes_and_edges,
29
    generate_task_hash,
30
)
31
from executorlib.standalone.thread import RaisingThread
1✔
32

33
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
34
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
35
    from executorlib.interactive.flux import (
1✔
36
        validate_max_workers as validate_max_workers_flux,
37
    )
38
except ImportError:
×
39
    pass
×
40

41

42
class ExecutorWithDependencies(ExecutorBase):
1✔
43
    """
44
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
45
    dependencies.
46

47
    Args:
48
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
49
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
50
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
51
        *args: Variable length argument list.
52
        **kwargs: Arbitrary keyword arguments.
53

54
    Attributes:
55
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
56
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
57
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
58
        _generate_dependency_graph (str): Name of the file to store the plotted graph in.
59

60
    """
61

62
    def __init__(
1✔
63
        self,
64
        *args: Any,
65
        refresh_rate: float = 0.01,
66
        plot_dependency_graph: bool = False,
67
        plot_dependency_graph_filename: Optional[str] = None,
68
        **kwargs: Any,
69
    ) -> None:
70
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
71
        executor = create_executor(*args, **kwargs)
1✔
72
        self._set_process(
1✔
73
            RaisingThread(
74
                target=execute_tasks_with_dependencies,
75
                kwargs={
76
                    # Executor Arguments
77
                    "future_queue": self._future_queue,
78
                    "executor_queue": executor._future_queue,
79
                    "executor": executor,
80
                    "refresh_rate": refresh_rate,
81
                },
82
            )
83
        )
84
        self._future_hash_dict = {}
1✔
85
        self._task_hash_dict = {}
1✔
86
        self._plot_dependency_graph_filename = plot_dependency_graph_filename
1✔
87
        if plot_dependency_graph_filename is None:
1✔
88
            self._generate_dependency_graph = plot_dependency_graph
1✔
89
        else:
90
            self._generate_dependency_graph = True
1✔
91

92
    def submit(
1✔
93
        self,
94
        fn: Callable[..., Any],
95
        *args: Any,
96
        resource_dict: Dict[str, Any] = {},
97
        **kwargs: Any,
98
    ) -> Future:
99
        """
100
        Submits a task to the executor.
101

102
        Args:
103
            fn (callable): The function to be executed.
104
            *args: Variable length argument list.
105
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
106
            **kwargs: Arbitrary keyword arguments.
107

108
        Returns:
109
            Future: A future object representing the result of the task.
110

111
        """
112
        if not self._generate_dependency_graph:
1✔
113
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
114
        else:
115
            f = Future()
1✔
116
            f.set_result(None)
1✔
117
            task_dict = {
1✔
118
                "fn": fn,
119
                "args": args,
120
                "kwargs": kwargs,
121
                "future": f,
122
                "resource_dict": resource_dict,
123
            }
124
            task_hash = generate_task_hash(
1✔
125
                task_dict=task_dict,
126
                future_hash_inverse_dict={
127
                    v: k for k, v in self._future_hash_dict.items()
128
                },
129
            )
130
            self._future_hash_dict[task_hash] = f
1✔
131
            self._task_hash_dict[task_hash] = task_dict
1✔
132
        return f
1✔
133

134
    def __exit__(
1✔
135
        self,
136
        exc_type: Any,
137
        exc_val: Any,
138
        exc_tb: Any,
139
    ) -> None:
140
        """
141
        Exit method called when exiting the context manager.
142

143
        Args:
144
            exc_type: The type of the exception.
145
            exc_val: The exception instance.
146
            exc_tb: The traceback object.
147

148
        """
149
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
150
        if self._generate_dependency_graph:
1✔
151
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
152
                task_hash_dict=self._task_hash_dict,
153
                future_hash_inverse_dict={
154
                    v: k for k, v in self._future_hash_dict.items()
155
                },
156
            )
157
            return draw(
1✔
158
                node_lst=node_lst,
159
                edge_lst=edge_lst,
160
                filename=self._plot_dependency_graph_filename,
161
            )
162

163

164
def create_executor(
1✔
165
    max_workers: Optional[int] = None,
166
    backend: str = "local",
167
    max_cores: Optional[int] = None,
168
    cache_directory: Optional[str] = None,
169
    resource_dict: Optional[dict] = None,
170
    flux_executor=None,
171
    flux_executor_pmi_mode: Optional[str] = None,
172
    flux_executor_nesting: bool = False,
173
    flux_log_files: bool = False,
174
    hostname_localhost: Optional[bool] = None,
175
    block_allocation: bool = False,
176
    init_function: Optional[callable] = None,
177
):
178
    """
179
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
180
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
181
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
182
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
183
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
184
    requires the SLURM workload manager to be installed on the system.
185

186
    Args:
187
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
188
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
189
                           recommended, as computers have a limited number of compute cores.
190
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
191
        max_cores (int): defines the number cores which can be used in parallel
192
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
193
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
194
                              - cores (int): number of MPI cores to be used for each function call
195
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
196
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
197
                              - cwd (str/None): current working directory where the parallel python task is executed
198
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
199
                                                              SLURM only) - default False
200
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
201
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
202
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
203
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
204
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
205
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
206
                                      context of an HPC cluster this essential to be able to communicate to an Executor
207
                                      running on a different compute node within the same allocation. And in principle
208
                                      any computer should be able to resolve that their own hostname points to the same
209
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
210
                                      reasons. So on MacOS it is required to set this option to true
211
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
212
                                    resource requirements, executorlib supports block allocation. In this case all
213
                                    resources have to be defined on the executor, rather than during the submission
214
                                    of the individual function.
215
        init_function (None): optional function to preset arguments for functions which are submitted later
216
    """
217
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
218
    if flux_executor is not None and backend != "flux_allocation":
1✔
219
        backend = "flux_allocation"
×
220
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
221
    cores_per_worker = resource_dict["cores"]
1✔
222
    resource_dict["cache_directory"] = cache_directory
1✔
223
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
224
    if backend == "flux_allocation":
1✔
225
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
226
        check_command_line_argument_lst(
1✔
227
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
228
        )
229
        del resource_dict["openmpi_oversubscribe"]
1✔
230
        del resource_dict["slurm_cmd_args"]
1✔
231
        resource_dict["flux_executor"] = flux_executor
1✔
232
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
233
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
234
        resource_dict["flux_log_files"] = flux_log_files
1✔
235
        if block_allocation:
1✔
236
            resource_dict["init_function"] = init_function
1✔
237
            max_workers = validate_number_of_cores(
1✔
238
                max_cores=max_cores,
239
                max_workers=max_workers,
240
                cores_per_worker=cores_per_worker,
241
                set_local_cores=False,
242
            )
243
            validate_max_workers_flux(
1✔
244
                max_workers=max_workers,
245
                cores=cores_per_worker,
246
                threads_per_core=resource_dict["threads_per_core"],
247
            )
248
            return InteractiveExecutor(
1✔
249
                max_workers=max_workers,
250
                executor_kwargs=resource_dict,
251
                spawner=FluxPythonSpawner,
252
            )
253
        else:
254
            return InteractiveStepExecutor(
×
255
                max_cores=max_cores,
256
                max_workers=max_workers,
257
                executor_kwargs=resource_dict,
258
                spawner=FluxPythonSpawner,
259
            )
260
    elif backend == "slurm_allocation":
1✔
261
        check_executor(executor=flux_executor)
×
262
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
263
        check_flux_log_files(flux_log_files=flux_log_files)
×
264
        if block_allocation:
×
265
            resource_dict["init_function"] = init_function
×
NEW
266
            max_workers = validate_number_of_cores(
×
267
                max_cores=max_cores,
268
                max_workers=max_workers,
269
                cores_per_worker=cores_per_worker,
270
                set_local_cores=False,
271
            )
NEW
272
            validate_max_workers_slurm(
×
273
                max_workers=max_workers,
274
                cores=cores_per_worker,
275
                threads_per_core=resource_dict["threads_per_core"],
276
            )
UNCOV
277
            return InteractiveExecutor(
×
278
                max_workers=max_workers,
279
                executor_kwargs=resource_dict,
280
                spawner=SrunSpawner,
281
            )
282
        else:
283
            return InteractiveStepExecutor(
×
284
                max_cores=max_cores,
285
                max_workers=max_workers,
286
                executor_kwargs=resource_dict,
287
                spawner=SrunSpawner,
288
            )
289
    elif backend == "local":
1✔
290
        check_executor(executor=flux_executor)
1✔
291
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
292
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
293
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
294
        check_command_line_argument_lst(
1✔
295
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
296
        )
297
        del resource_dict["threads_per_core"]
1✔
298
        del resource_dict["gpus_per_core"]
1✔
299
        del resource_dict["slurm_cmd_args"]
1✔
300
        if block_allocation:
1✔
301
            resource_dict["init_function"] = init_function
1✔
302
            return InteractiveExecutor(
1✔
303
                max_workers=validate_number_of_cores(
304
                    max_cores=max_cores,
305
                    max_workers=max_workers,
306
                    cores_per_worker=cores_per_worker,
307
                    set_local_cores=True,
308
                ),
309
                executor_kwargs=resource_dict,
310
                spawner=MpiExecSpawner,
311
            )
312
        else:
313
            return InteractiveStepExecutor(
1✔
314
                max_cores=max_cores,
315
                max_workers=max_workers,
316
                executor_kwargs=resource_dict,
317
                spawner=MpiExecSpawner,
318
            )
319
    else:
320
        raise ValueError(
1✔
321
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
322
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc