• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12365729826

17 Dec 2024 03:23AM UTC coverage: 95.409% (-0.3%) from 95.661%
12365729826

Pull #519

github

web-flow
Merge 03659b665 into c3a0ae727
Pull Request #519: Add option to write flux log files

8 of 11 new or added lines in 4 files covered. (72.73%)

3 existing lines in 2 files now uncovered.

956 of 1002 relevant lines covered (95.41%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.42
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.standalone.inputcheck import (
1✔
11
    check_command_line_argument_lst,
12
    check_executor,
13
    check_flux_log_files,
14
    check_gpus_per_worker,
15
    check_init_function,
16
    check_nested_flux_executor,
17
    check_oversubscribe,
18
    check_pmi,
19
    validate_number_of_cores,
20
)
21
from executorlib.standalone.interactive.spawner import (
1✔
22
    MpiExecSpawner,
23
    SrunSpawner,
24
)
25
from executorlib.standalone.plot import (
1✔
26
    draw,
27
    generate_nodes_and_edges,
28
    generate_task_hash,
29
)
30
from executorlib.standalone.thread import RaisingThread
1✔
31

32
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
33
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
34
except ImportError:
×
35
    pass
×
36

37

38
class ExecutorWithDependencies(ExecutorBase):
1✔
39
    """
40
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
41
    dependencies.
42

43
    Args:
44
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
45
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
46
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
47
        *args: Variable length argument list.
48
        **kwargs: Arbitrary keyword arguments.
49

50
    Attributes:
51
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
52
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
53
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
54
        _generate_dependency_graph (str): Name of the file to store the plotted graph in.
55

56
    """
57

58
    def __init__(
1✔
59
        self,
60
        *args: Any,
61
        refresh_rate: float = 0.01,
62
        plot_dependency_graph: bool = False,
63
        plot_dependency_graph_filename: Optional[str] = None,
64
        **kwargs: Any,
65
    ) -> None:
66
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
67
        executor = create_executor(*args, **kwargs)
1✔
68
        self._set_process(
1✔
69
            RaisingThread(
70
                target=execute_tasks_with_dependencies,
71
                kwargs={
72
                    # Executor Arguments
73
                    "future_queue": self._future_queue,
74
                    "executor_queue": executor._future_queue,
75
                    "executor": executor,
76
                    "refresh_rate": refresh_rate,
77
                },
78
            )
79
        )
80
        self._future_hash_dict = {}
1✔
81
        self._task_hash_dict = {}
1✔
82
        self._plot_dependency_graph_filename = plot_dependency_graph_filename
1✔
83
        if plot_dependency_graph_filename is None:
1✔
84
            self._generate_dependency_graph = plot_dependency_graph
1✔
85
        else:
86
            self._generate_dependency_graph = True
1✔
87

88
    def submit(
1✔
89
        self,
90
        fn: Callable[..., Any],
91
        *args: Any,
92
        resource_dict: Dict[str, Any] = {},
93
        **kwargs: Any,
94
    ) -> Future:
95
        """
96
        Submits a task to the executor.
97

98
        Args:
99
            fn (callable): The function to be executed.
100
            *args: Variable length argument list.
101
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
102
            **kwargs: Arbitrary keyword arguments.
103

104
        Returns:
105
            Future: A future object representing the result of the task.
106

107
        """
108
        if not self._generate_dependency_graph:
1✔
109
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
110
        else:
111
            f = Future()
1✔
112
            f.set_result(None)
1✔
113
            task_dict = {
1✔
114
                "fn": fn,
115
                "args": args,
116
                "kwargs": kwargs,
117
                "future": f,
118
                "resource_dict": resource_dict,
119
            }
120
            task_hash = generate_task_hash(
1✔
121
                task_dict=task_dict,
122
                future_hash_inverse_dict={
123
                    v: k for k, v in self._future_hash_dict.items()
124
                },
125
            )
126
            self._future_hash_dict[task_hash] = f
1✔
127
            self._task_hash_dict[task_hash] = task_dict
1✔
128
        return f
1✔
129

130
    def __exit__(
1✔
131
        self,
132
        exc_type: Any,
133
        exc_val: Any,
134
        exc_tb: Any,
135
    ) -> None:
136
        """
137
        Exit method called when exiting the context manager.
138

139
        Args:
140
            exc_type: The type of the exception.
141
            exc_val: The exception instance.
142
            exc_tb: The traceback object.
143

144
        """
145
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
146
        if self._generate_dependency_graph:
1✔
147
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
148
                task_hash_dict=self._task_hash_dict,
149
                future_hash_inverse_dict={
150
                    v: k for k, v in self._future_hash_dict.items()
151
                },
152
            )
153
            return draw(
1✔
154
                node_lst=node_lst,
155
                edge_lst=edge_lst,
156
                filename=self._plot_dependency_graph_filename,
157
            )
158

159

160
def create_executor(
1✔
161
    max_workers: Optional[int] = None,
162
    backend: str = "local",
163
    max_cores: Optional[int] = None,
164
    cache_directory: Optional[str] = None,
165
    resource_dict: Optional[dict] = None,
166
    flux_executor=None,
167
    flux_executor_pmi_mode: Optional[str] = None,
168
    flux_executor_nesting: bool = False,
169
    flux_log_files: bool = False,
170
    hostname_localhost: Optional[bool] = None,
171
    block_allocation: bool = False,
172
    init_function: Optional[callable] = None,
173
):
174
    """
175
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
176
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
177
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
178
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
179
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
180
    requires the SLURM workload manager to be installed on the system.
181

182
    Args:
183
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
184
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
185
                           recommended, as computers have a limited number of compute cores.
186
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
187
        max_cores (int): defines the number cores which can be used in parallel
188
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
189
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
190
                              - cores (int): number of MPI cores to be used for each function call
191
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
192
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
193
                              - cwd (str/None): current working directory where the parallel python task is executed
194
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
195
                                                              SLURM only) - default False
196
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
197
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
198
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
199
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
200
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
201
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
202
                                      context of an HPC cluster this essential to be able to communicate to an Executor
203
                                      running on a different compute node within the same allocation. And in principle
204
                                      any computer should be able to resolve that their own hostname points to the same
205
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
206
                                      reasons. So on MacOS it is required to set this option to true
207
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
208
                                    resource requirements, executorlib supports block allocation. In this case all
209
                                    resources have to be defined on the executor, rather than during the submission
210
                                    of the individual function.
211
        init_function (None): optional function to preset arguments for functions which are submitted later
212
    """
213
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
214
    if flux_executor is not None and backend != "flux_allocation":
1✔
215
        backend = "flux_allocation"
×
216
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
217
    cores_per_worker = resource_dict["cores"]
1✔
218
    resource_dict["cache_directory"] = cache_directory
1✔
219
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
220
    if backend == "flux_allocation":
1✔
221
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
222
        check_command_line_argument_lst(
1✔
223
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
224
        )
225
        del resource_dict["openmpi_oversubscribe"]
1✔
226
        del resource_dict["slurm_cmd_args"]
1✔
227
        resource_dict["flux_executor"] = flux_executor
1✔
228
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
229
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
230
        resource_dict["flux_log_files"] = flux_log_files
1✔
231
        if block_allocation:
1✔
232
            resource_dict["init_function"] = init_function
1✔
233
            return InteractiveExecutor(
1✔
234
                max_workers=validate_number_of_cores(
235
                    max_cores=max_cores,
236
                    max_workers=max_workers,
237
                    cores_per_worker=cores_per_worker,
238
                    set_local_cores=False,
239
                ),
240
                executor_kwargs=resource_dict,
241
                spawner=FluxPythonSpawner,
242
            )
243
        else:
244
            return InteractiveStepExecutor(
×
245
                max_cores=max_cores,
246
                max_workers=max_workers,
247
                executor_kwargs=resource_dict,
248
                spawner=FluxPythonSpawner,
249
            )
250
    elif backend == "slurm_allocation":
1✔
251
        check_executor(executor=flux_executor)
×
252
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
NEW
253
        check_flux_log_files(flux_log_files=flux_log_files)
×
254
        if block_allocation:
×
255
            resource_dict["init_function"] = init_function
×
UNCOV
256
            return InteractiveExecutor(
×
257
                max_workers=validate_number_of_cores(
258
                    max_cores=max_cores,
259
                    max_workers=max_workers,
260
                    cores_per_worker=cores_per_worker,
261
                    set_local_cores=False,
262
                ),
263
                executor_kwargs=resource_dict,
264
                spawner=SrunSpawner,
265
            )
266
        else:
267
            return InteractiveStepExecutor(
×
268
                max_cores=max_cores,
269
                max_workers=max_workers,
270
                executor_kwargs=resource_dict,
271
                spawner=SrunSpawner,
272
            )
273
    elif backend == "local":
1✔
274
        check_executor(executor=flux_executor)
1✔
275
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
276
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
277
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
278
        check_command_line_argument_lst(
1✔
279
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
280
        )
281
        del resource_dict["threads_per_core"]
1✔
282
        del resource_dict["gpus_per_core"]
1✔
283
        del resource_dict["slurm_cmd_args"]
1✔
284
        if block_allocation:
1✔
285
            resource_dict["init_function"] = init_function
1✔
286
            return InteractiveExecutor(
1✔
287
                max_workers=validate_number_of_cores(
288
                    max_cores=max_cores,
289
                    max_workers=max_workers,
290
                    cores_per_worker=cores_per_worker,
291
                    set_local_cores=True,
292
                ),
293
                executor_kwargs=resource_dict,
294
                spawner=MpiExecSpawner,
295
            )
296
        else:
297
            return InteractiveStepExecutor(
1✔
298
                max_cores=max_cores,
299
                max_workers=max_workers,
300
                executor_kwargs=resource_dict,
301
                spawner=MpiExecSpawner,
302
            )
303
    else:
304
        raise ValueError(
1✔
305
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
306
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc