• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12425443902

20 Dec 2024 04:17AM UTC coverage: 95.252% (-0.5%) from 95.793%
12425443902

Pull #529

github

web-flow
Merge ca71e531d into f1649fab2
Pull Request #529: Add validate_max_workers_slurm method

4 of 12 new or added lines in 2 files covered. (33.33%)

7 existing lines in 1 file now uncovered.

983 of 1032 relevant lines covered (95.25%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.88
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.interactive.slurm import SrunSpawner
1✔
11
from executorlib.interactive.slurm import validate_max_workers as validate_max_workers_slurm
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_command_line_argument_lst,
14
    check_executor,
15
    check_gpus_per_worker,
16
    check_init_function,
17
    check_nested_flux_executor,
18
    check_oversubscribe,
19
    check_pmi,
20
    validate_number_of_cores,
21
)
22
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
23
from executorlib.standalone.plot import (
1✔
24
    draw,
25
    generate_nodes_and_edges,
26
    generate_task_hash,
27
)
28
from executorlib.standalone.thread import RaisingThread
1✔
29

30
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
31
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
32
    from executorlib.interactive.flux import validate_max_workers as validate_max_workers_flux
1✔
NEW
33
except ImportError:
×
NEW
34
    pass
×
35

36

37
class ExecutorWithDependencies(ExecutorBase):
1✔
38
    """
39
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
40
    dependencies.
41

42
    Args:
43
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
44
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
45
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
46
        *args: Variable length argument list.
47
        **kwargs: Arbitrary keyword arguments.
48

49
    Attributes:
50
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
51
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
52
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
53
        _generate_dependency_graph (str): Name of the file to store the plotted graph in.
54

55
    """
56

57
    def __init__(
1✔
58
        self,
59
        *args: Any,
60
        refresh_rate: float = 0.01,
61
        plot_dependency_graph: bool = False,
62
        plot_dependency_graph_filename: Optional[str] = None,
63
        **kwargs: Any,
64
    ) -> None:
65
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
66
        executor = create_executor(*args, **kwargs)
1✔
67
        self._set_process(
1✔
68
            RaisingThread(
69
                target=execute_tasks_with_dependencies,
70
                kwargs={
71
                    # Executor Arguments
72
                    "future_queue": self._future_queue,
73
                    "executor_queue": executor._future_queue,
74
                    "executor": executor,
75
                    "refresh_rate": refresh_rate,
76
                },
77
            )
78
        )
79
        self._future_hash_dict = {}
1✔
80
        self._task_hash_dict = {}
1✔
81
        self._plot_dependency_graph_filename = plot_dependency_graph_filename
1✔
82
        if plot_dependency_graph_filename is None:
1✔
83
            self._generate_dependency_graph = plot_dependency_graph
1✔
84
        else:
85
            self._generate_dependency_graph = True
1✔
86

87
    def submit(
1✔
88
        self,
89
        fn: Callable[..., Any],
90
        *args: Any,
91
        resource_dict: Dict[str, Any] = {},
92
        **kwargs: Any,
93
    ) -> Future:
94
        """
95
        Submits a task to the executor.
96

97
        Args:
98
            fn (callable): The function to be executed.
99
            *args: Variable length argument list.
100
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
101
            **kwargs: Arbitrary keyword arguments.
102

103
        Returns:
104
            Future: A future object representing the result of the task.
105

106
        """
107
        if not self._generate_dependency_graph:
1✔
108
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
109
        else:
110
            f = Future()
1✔
111
            f.set_result(None)
1✔
112
            task_dict = {
1✔
113
                "fn": fn,
114
                "args": args,
115
                "kwargs": kwargs,
116
                "future": f,
117
                "resource_dict": resource_dict,
118
            }
119
            task_hash = generate_task_hash(
1✔
120
                task_dict=task_dict,
121
                future_hash_inverse_dict={
122
                    v: k for k, v in self._future_hash_dict.items()
123
                },
124
            )
125
            self._future_hash_dict[task_hash] = f
1✔
126
            self._task_hash_dict[task_hash] = task_dict
1✔
127
        return f
1✔
128

129
    def __exit__(
1✔
130
        self,
131
        exc_type: Any,
132
        exc_val: Any,
133
        exc_tb: Any,
134
    ) -> None:
135
        """
136
        Exit method called when exiting the context manager.
137

138
        Args:
139
            exc_type: The type of the exception.
140
            exc_val: The exception instance.
141
            exc_tb: The traceback object.
142

143
        """
144
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
145
        if self._generate_dependency_graph:
1✔
146
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
147
                task_hash_dict=self._task_hash_dict,
148
                future_hash_inverse_dict={
149
                    v: k for k, v in self._future_hash_dict.items()
150
                },
151
            )
152
            return draw(
1✔
153
                node_lst=node_lst,
154
                edge_lst=edge_lst,
155
                filename=self._plot_dependency_graph_filename,
156
            )
157

158

159
def create_executor(
1✔
160
    max_workers: Optional[int] = None,
161
    backend: str = "local",
162
    max_cores: Optional[int] = None,
163
    cache_directory: Optional[str] = None,
164
    resource_dict: Optional[dict] = None,
165
    flux_executor=None,
166
    flux_executor_pmi_mode: Optional[str] = None,
167
    flux_executor_nesting: bool = False,
168
    hostname_localhost: Optional[bool] = None,
169
    block_allocation: bool = False,
170
    init_function: Optional[callable] = None,
171
):
172
    """
173
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
174
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
175
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
176
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
177
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
178
    requires the SLURM workload manager to be installed on the system.
179

180
    Args:
181
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
182
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
183
                           recommended, as computers have a limited number of compute cores.
184
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
185
        max_cores (int): defines the number cores which can be used in parallel
186
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
187
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
188
                              - cores (int): number of MPI cores to be used for each function call
189
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
190
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
191
                              - cwd (str/None): current working directory where the parallel python task is executed
192
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
193
                                                              SLURM only) - default False
194
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
195
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
196
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
197
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
198
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
199
                                      context of an HPC cluster this essential to be able to communicate to an Executor
200
                                      running on a different compute node within the same allocation. And in principle
201
                                      any computer should be able to resolve that their own hostname points to the same
202
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
203
                                      reasons. So on MacOS it is required to set this option to true
204
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
205
                                    resource requirements, executorlib supports block allocation. In this case all
206
                                    resources have to be defined on the executor, rather than during the submission
207
                                    of the individual function.
208
        init_function (None): optional function to preset arguments for functions which are submitted later
209
    """
210
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
211
    if flux_executor is not None and backend != "flux_allocation":
1✔
UNCOV
212
        backend = "flux_allocation"
×
213
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
214
    cores_per_worker = resource_dict["cores"]
1✔
215
    resource_dict["cache_directory"] = cache_directory
1✔
216
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
217
    if backend == "flux_allocation":
1✔
218
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
219
        check_command_line_argument_lst(
1✔
220
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
221
        )
222
        del resource_dict["openmpi_oversubscribe"]
1✔
223
        del resource_dict["slurm_cmd_args"]
1✔
224
        resource_dict["flux_executor"] = flux_executor
1✔
225
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
226
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
227
        if block_allocation:
1✔
228
            resource_dict["init_function"] = init_function
1✔
229
            max_workers = validate_number_of_cores(
1✔
230
                max_cores=max_cores,
231
                max_workers=max_workers,
232
                cores_per_worker=cores_per_worker,
233
                set_local_cores=False,
234
            )
235
            validate_max_workers_flux(
1✔
236
                max_workers=max_workers,
237
                cores=cores_per_worker,
238
                threads_per_core=resource_dict["threads_per_core"],
239
            )
240
            return InteractiveExecutor(
1✔
241
                max_workers=max_workers,
242
                executor_kwargs=resource_dict,
243
                spawner=FluxPythonSpawner,
244
            )
245
        else:
UNCOV
246
            return InteractiveStepExecutor(
×
247
                max_cores=max_cores,
248
                max_workers=max_workers,
249
                executor_kwargs=resource_dict,
250
                spawner=FluxPythonSpawner,
251
            )
252
    elif backend == "slurm_allocation":
1✔
UNCOV
253
        check_executor(executor=flux_executor)
×
UNCOV
254
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
UNCOV
255
        if block_allocation:
×
UNCOV
256
            resource_dict["init_function"] = init_function
×
257
            max_workers = validate_number_of_cores(
×
258
                max_cores=max_cores,
259
                max_workers=max_workers,
260
                cores_per_worker=cores_per_worker,
261
                set_local_cores=False,
262
            )
NEW
263
            validate_max_workers_slurm(
×
264
                max_workers=max_workers,
265
                cores=cores_per_worker,
266
                threads_per_core=resource_dict["threads_per_core"],
267
            )
NEW
268
            return InteractiveExecutor(
×
269
                max_workers=max_workers,
270
                executor_kwargs=resource_dict,
271
                spawner=SrunSpawner,
272
            )
273
        else:
UNCOV
274
            return InteractiveStepExecutor(
×
275
                max_cores=max_cores,
276
                max_workers=max_workers,
277
                executor_kwargs=resource_dict,
278
                spawner=SrunSpawner,
279
            )
280
    elif backend == "local":
1✔
281
        check_executor(executor=flux_executor)
1✔
282
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
283
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
284
        check_command_line_argument_lst(
1✔
285
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
286
        )
287
        del resource_dict["threads_per_core"]
1✔
288
        del resource_dict["gpus_per_core"]
1✔
289
        del resource_dict["slurm_cmd_args"]
1✔
290
        if block_allocation:
1✔
291
            resource_dict["init_function"] = init_function
1✔
292
            return InteractiveExecutor(
1✔
293
                max_workers=validate_number_of_cores(
294
                    max_cores=max_cores,
295
                    max_workers=max_workers,
296
                    cores_per_worker=cores_per_worker,
297
                    set_local_cores=True,
298
                ),
299
                executor_kwargs=resource_dict,
300
                spawner=MpiExecSpawner,
301
            )
302
        else:
303
            return InteractiveStepExecutor(
1✔
304
                max_cores=max_cores,
305
                max_workers=max_workers,
306
                executor_kwargs=resource_dict,
307
                spawner=MpiExecSpawner,
308
            )
309
    else:
310
        raise ValueError(
1✔
311
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
312
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc