• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12425609517

20 Dec 2024 04:35AM UTC coverage: 95.367% (-0.3%) from 95.704%
12425609517

Pull #519

github

web-flow
Merge e930b7919 into f1649fab2
Pull Request #519: Add option to write flux log files

5 of 6 new or added lines in 4 files covered. (83.33%)

20 existing lines in 4 files now uncovered.

988 of 1036 relevant lines covered (95.37%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.9
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.interactive.slurm import SrunSpawner
1✔
11
from executorlib.standalone.inputcheck import (
1✔
12
    check_command_line_argument_lst,
13
    check_executor,
14
    check_flux_log_files,
15
    check_gpus_per_worker,
16
    check_init_function,
17
    check_nested_flux_executor,
18
    check_oversubscribe,
19
    check_pmi,
20
    validate_number_of_cores,
21
)
22
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
23
from executorlib.standalone.plot import (
1✔
24
    draw,
25
    generate_nodes_and_edges,
26
    generate_task_hash,
27
)
28
from executorlib.standalone.thread import RaisingThread
1✔
29

30
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
31
    from executorlib.interactive.flux import FluxPythonSpawner, validate_max_workers
1✔
UNCOV
32
except ImportError:
×
UNCOV
33
    pass
×
34

35

36
class ExecutorWithDependencies(ExecutorBase):
1✔
37
    """
38
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
39
    dependencies.
40

41
    Args:
42
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
43
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
44
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
45
        *args: Variable length argument list.
46
        **kwargs: Arbitrary keyword arguments.
47

48
    Attributes:
49
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
50
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
51
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
52
        _generate_dependency_graph (str): Name of the file to store the plotted graph in.
53

54
    """
55

56
    def __init__(
1✔
57
        self,
58
        *args: Any,
59
        refresh_rate: float = 0.01,
60
        plot_dependency_graph: bool = False,
61
        plot_dependency_graph_filename: Optional[str] = None,
62
        **kwargs: Any,
63
    ) -> None:
64
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
65
        executor = create_executor(*args, **kwargs)
1✔
66
        self._set_process(
1✔
67
            RaisingThread(
68
                target=execute_tasks_with_dependencies,
69
                kwargs={
70
                    # Executor Arguments
71
                    "future_queue": self._future_queue,
72
                    "executor_queue": executor._future_queue,
73
                    "executor": executor,
74
                    "refresh_rate": refresh_rate,
75
                },
76
            )
77
        )
78
        self._future_hash_dict = {}
1✔
79
        self._task_hash_dict = {}
1✔
80
        self._plot_dependency_graph_filename = plot_dependency_graph_filename
1✔
81
        if plot_dependency_graph_filename is None:
1✔
82
            self._generate_dependency_graph = plot_dependency_graph
1✔
83
        else:
84
            self._generate_dependency_graph = True
1✔
85

86
    def submit(
1✔
87
        self,
88
        fn: Callable[..., Any],
89
        *args: Any,
90
        resource_dict: Dict[str, Any] = {},
91
        **kwargs: Any,
92
    ) -> Future:
93
        """
94
        Submits a task to the executor.
95

96
        Args:
97
            fn (callable): The function to be executed.
98
            *args: Variable length argument list.
99
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
100
            **kwargs: Arbitrary keyword arguments.
101

102
        Returns:
103
            Future: A future object representing the result of the task.
104

105
        """
106
        if not self._generate_dependency_graph:
1✔
107
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
108
        else:
109
            f = Future()
1✔
110
            f.set_result(None)
1✔
111
            task_dict = {
1✔
112
                "fn": fn,
113
                "args": args,
114
                "kwargs": kwargs,
115
                "future": f,
116
                "resource_dict": resource_dict,
117
            }
118
            task_hash = generate_task_hash(
1✔
119
                task_dict=task_dict,
120
                future_hash_inverse_dict={
121
                    v: k for k, v in self._future_hash_dict.items()
122
                },
123
            )
124
            self._future_hash_dict[task_hash] = f
1✔
125
            self._task_hash_dict[task_hash] = task_dict
1✔
126
        return f
1✔
127

128
    def __exit__(
1✔
129
        self,
130
        exc_type: Any,
131
        exc_val: Any,
132
        exc_tb: Any,
133
    ) -> None:
134
        """
135
        Exit method called when exiting the context manager.
136

137
        Args:
138
            exc_type: The type of the exception.
139
            exc_val: The exception instance.
140
            exc_tb: The traceback object.
141

142
        """
143
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
144
        if self._generate_dependency_graph:
1✔
145
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
146
                task_hash_dict=self._task_hash_dict,
147
                future_hash_inverse_dict={
148
                    v: k for k, v in self._future_hash_dict.items()
149
                },
150
            )
151
            return draw(
1✔
152
                node_lst=node_lst,
153
                edge_lst=edge_lst,
154
                filename=self._plot_dependency_graph_filename,
155
            )
156

157

158
def create_executor(
1✔
159
    max_workers: Optional[int] = None,
160
    backend: str = "local",
161
    max_cores: Optional[int] = None,
162
    cache_directory: Optional[str] = None,
163
    resource_dict: Optional[dict] = None,
164
    flux_executor=None,
165
    flux_executor_pmi_mode: Optional[str] = None,
166
    flux_executor_nesting: bool = False,
167
    flux_log_files: bool = False,
168
    hostname_localhost: Optional[bool] = None,
169
    block_allocation: bool = False,
170
    init_function: Optional[callable] = None,
171
):
172
    """
173
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
174
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
175
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
176
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
177
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
178
    requires the SLURM workload manager to be installed on the system.
179

180
    Args:
181
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
182
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
183
                           recommended, as computers have a limited number of compute cores.
184
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
185
        max_cores (int): defines the number cores which can be used in parallel
186
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
187
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
188
                              - cores (int): number of MPI cores to be used for each function call
189
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
190
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
191
                              - cwd (str/None): current working directory where the parallel python task is executed
192
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
193
                                                              SLURM only) - default False
194
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
195
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
196
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
197
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
198
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
199
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
200
                                      context of an HPC cluster this essential to be able to communicate to an Executor
201
                                      running on a different compute node within the same allocation. And in principle
202
                                      any computer should be able to resolve that their own hostname points to the same
203
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
204
                                      reasons. So on MacOS it is required to set this option to true
205
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
206
                                    resource requirements, executorlib supports block allocation. In this case all
207
                                    resources have to be defined on the executor, rather than during the submission
208
                                    of the individual function.
209
        init_function (None): optional function to preset arguments for functions which are submitted later
210
    """
211
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
212
    if flux_executor is not None and backend != "flux_allocation":
1✔
UNCOV
213
        backend = "flux_allocation"
×
214
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
215
    cores_per_worker = resource_dict["cores"]
1✔
216
    resource_dict["cache_directory"] = cache_directory
1✔
217
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
218
    if backend == "flux_allocation":
1✔
219
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
220
        check_command_line_argument_lst(
1✔
221
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
222
        )
223
        del resource_dict["openmpi_oversubscribe"]
1✔
224
        del resource_dict["slurm_cmd_args"]
1✔
225
        resource_dict["flux_executor"] = flux_executor
1✔
226
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
227
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
228
        resource_dict["flux_log_files"] = flux_log_files
1✔
229
        if block_allocation:
1✔
230
            resource_dict["init_function"] = init_function
1✔
231
            max_workers = validate_number_of_cores(
1✔
232
                max_cores=max_cores,
233
                max_workers=max_workers,
234
                cores_per_worker=cores_per_worker,
235
                set_local_cores=False,
236
            )
237
            validate_max_workers(
1✔
238
                max_workers=max_workers,
239
                cores=cores_per_worker,
240
                threads_per_core=resource_dict["threads_per_core"],
241
            )
242
            return InteractiveExecutor(
1✔
243
                max_workers=max_workers,
244
                executor_kwargs=resource_dict,
245
                spawner=FluxPythonSpawner,
246
            )
247
        else:
UNCOV
248
            return InteractiveStepExecutor(
×
249
                max_cores=max_cores,
250
                max_workers=max_workers,
251
                executor_kwargs=resource_dict,
252
                spawner=FluxPythonSpawner,
253
            )
254
    elif backend == "slurm_allocation":
1✔
255
        check_executor(executor=flux_executor)
×
256
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
UNCOV
257
        check_flux_log_files(flux_log_files=flux_log_files)
×
UNCOV
258
        if block_allocation:
×
UNCOV
259
            resource_dict["init_function"] = init_function
×
UNCOV
260
            return InteractiveExecutor(
×
261
                max_workers=validate_number_of_cores(
262
                    max_cores=max_cores,
263
                    max_workers=max_workers,
264
                    cores_per_worker=cores_per_worker,
265
                    set_local_cores=False,
266
                ),
267
                executor_kwargs=resource_dict,
268
                spawner=SrunSpawner,
269
            )
270
        else:
UNCOV
271
            return InteractiveStepExecutor(
×
272
                max_cores=max_cores,
273
                max_workers=max_workers,
274
                executor_kwargs=resource_dict,
275
                spawner=SrunSpawner,
276
            )
277
    elif backend == "local":
1✔
278
        check_executor(executor=flux_executor)
1✔
279
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
280
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
281
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
282
        check_command_line_argument_lst(
1✔
283
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
284
        )
285
        del resource_dict["threads_per_core"]
1✔
286
        del resource_dict["gpus_per_core"]
1✔
287
        del resource_dict["slurm_cmd_args"]
1✔
288
        if block_allocation:
1✔
289
            resource_dict["init_function"] = init_function
1✔
290
            return InteractiveExecutor(
1✔
291
                max_workers=validate_number_of_cores(
292
                    max_cores=max_cores,
293
                    max_workers=max_workers,
294
                    cores_per_worker=cores_per_worker,
295
                    set_local_cores=True,
296
                ),
297
                executor_kwargs=resource_dict,
298
                spawner=MpiExecSpawner,
299
            )
300
        else:
301
            return InteractiveStepExecutor(
1✔
302
                max_cores=max_cores,
303
                max_workers=max_workers,
304
                executor_kwargs=resource_dict,
305
                spawner=MpiExecSpawner,
306
            )
307
    else:
308
        raise ValueError(
1✔
309
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
310
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc