• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12308565323

13 Dec 2024 02:39AM UTC coverage: 95.386% (-0.3%) from 95.639%
12308565323

Pull #519

github

web-flow
Merge e7371b7a5 into f1c4ffa9c
Pull Request #519: Add option to write flux log files

8 of 11 new or added lines in 4 files covered. (72.73%)

4 existing lines in 2 files now uncovered.

951 of 997 relevant lines covered (95.39%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.9
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.standalone.inputcheck import (
1✔
11
    check_command_line_argument_lst,
12
    check_executor,
13
    check_flux_log_files,
14
    check_gpus_per_worker,
15
    check_init_function,
16
    check_nested_flux_executor,
17
    check_oversubscribe,
18
    check_pmi,
19
    validate_number_of_cores,
20
)
21
from executorlib.standalone.interactive.spawner import (
1✔
22
    MpiExecSpawner,
23
    SrunSpawner,
24
)
25
from executorlib.standalone.plot import (
1✔
26
    draw,
27
    generate_nodes_and_edges,
28
    generate_task_hash,
29
)
30
from executorlib.standalone.thread import RaisingThread
1✔
31

32
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
33
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
34
except ImportError:
×
35
    pass
×
36

37

38
class ExecutorWithDependencies(ExecutorBase):
1✔
39
    """
40
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
41
    dependencies.
42

43
    Args:
44
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
45
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
46
        *args: Variable length argument list.
47
        **kwargs: Arbitrary keyword arguments.
48

49
    Attributes:
50
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
51
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
52
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
53

54
    """
55

56
    def __init__(
1✔
57
        self,
58
        *args: Any,
59
        refresh_rate: float = 0.01,
60
        plot_dependency_graph: bool = False,
61
        **kwargs: Any,
62
    ) -> None:
63
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
64
        executor = create_executor(*args, **kwargs)
1✔
65
        self._set_process(
1✔
66
            RaisingThread(
67
                target=execute_tasks_with_dependencies,
68
                kwargs={
69
                    # Executor Arguments
70
                    "future_queue": self._future_queue,
71
                    "executor_queue": executor._future_queue,
72
                    "executor": executor,
73
                    "refresh_rate": refresh_rate,
74
                },
75
            )
76
        )
77
        self._future_hash_dict = {}
1✔
78
        self._task_hash_dict = {}
1✔
79
        self._generate_dependency_graph = plot_dependency_graph
1✔
80

81
    def submit(
1✔
82
        self,
83
        fn: Callable[..., Any],
84
        *args: Any,
85
        resource_dict: Dict[str, Any] = {},
86
        **kwargs: Any,
87
    ) -> Future:
88
        """
89
        Submits a task to the executor.
90

91
        Args:
92
            fn (callable): The function to be executed.
93
            *args: Variable length argument list.
94
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
95
            **kwargs: Arbitrary keyword arguments.
96

97
        Returns:
98
            Future: A future object representing the result of the task.
99

100
        """
101
        if not self._generate_dependency_graph:
1✔
102
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
103
        else:
104
            f = Future()
1✔
105
            f.set_result(None)
1✔
106
            task_dict = {
1✔
107
                "fn": fn,
108
                "args": args,
109
                "kwargs": kwargs,
110
                "future": f,
111
                "resource_dict": resource_dict,
112
            }
113
            task_hash = generate_task_hash(
1✔
114
                task_dict=task_dict,
115
                future_hash_inverse_dict={
116
                    v: k for k, v in self._future_hash_dict.items()
117
                },
118
            )
119
            self._future_hash_dict[task_hash] = f
1✔
120
            self._task_hash_dict[task_hash] = task_dict
1✔
121
        return f
1✔
122

123
    def __exit__(
1✔
124
        self,
125
        exc_type: Any,
126
        exc_val: Any,
127
        exc_tb: Any,
128
    ) -> None:
129
        """
130
        Exit method called when exiting the context manager.
131

132
        Args:
133
            exc_type: The type of the exception.
134
            exc_val: The exception instance.
135
            exc_tb: The traceback object.
136

137
        """
138
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
139
        if self._generate_dependency_graph:
1✔
140
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
141
                task_hash_dict=self._task_hash_dict,
142
                future_hash_inverse_dict={
143
                    v: k for k, v in self._future_hash_dict.items()
144
                },
145
            )
146
            return draw(node_lst=node_lst, edge_lst=edge_lst)
1✔
147

148

149
def create_executor(
1✔
150
    max_workers: Optional[int] = None,
151
    backend: str = "local",
152
    max_cores: Optional[int] = None,
153
    cache_directory: Optional[str] = None,
154
    resource_dict: Optional[dict] = None,
155
    flux_executor=None,
156
    flux_executor_pmi_mode: Optional[str] = None,
157
    flux_executor_nesting: bool = False,
158
    flux_log_files: bool = False,
159
    hostname_localhost: Optional[bool] = None,
160
    block_allocation: bool = False,
161
    init_function: Optional[callable] = None,
162
):
163
    """
164
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
165
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
166
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
167
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
168
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
169
    requires the SLURM workload manager to be installed on the system.
170

171
    Args:
172
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
173
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
174
                           recommended, as computers have a limited number of compute cores.
175
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
176
        max_cores (int): defines the number cores which can be used in parallel
177
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
178
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
179
                              - cores (int): number of MPI cores to be used for each function call
180
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
181
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
182
                              - cwd (str/None): current working directory where the parallel python task is executed
183
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
184
                                                              SLURM only) - default False
185
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
186
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
187
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
188
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
189
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
190
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
191
                                      context of an HPC cluster this essential to be able to communicate to an Executor
192
                                      running on a different compute node within the same allocation. And in principle
193
                                      any computer should be able to resolve that their own hostname points to the same
194
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
195
                                      reasons. So on MacOS it is required to set this option to true
196
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
197
                                    resource requirements, executorlib supports block allocation. In this case all
198
                                    resources have to be defined on the executor, rather than during the submission
199
                                    of the individual function.
200
        init_function (None): optional function to preset arguments for functions which are submitted later
201
    """
202
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
203
    if flux_executor is not None and backend != "flux_allocation":
1✔
204
        backend = "flux_allocation"
×
205
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
206
    cores_per_worker = resource_dict["cores"]
1✔
207
    resource_dict["cache_directory"] = cache_directory
1✔
208
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
209
    if backend == "flux_allocation":
1✔
210
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
211
        check_command_line_argument_lst(
1✔
212
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
213
        )
214
        del resource_dict["openmpi_oversubscribe"]
1✔
215
        del resource_dict["slurm_cmd_args"]
1✔
216
        resource_dict["flux_executor"] = flux_executor
1✔
217
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
218
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
219
        resource_dict["flux_log_files"] = flux_log_files
1✔
220
        if block_allocation:
1✔
221
            resource_dict["init_function"] = init_function
1✔
222
            return InteractiveExecutor(
1✔
223
                max_workers=validate_number_of_cores(
224
                    max_cores=max_cores,
225
                    max_workers=max_workers,
226
                    cores_per_worker=cores_per_worker,
227
                    set_local_cores=False,
228
                ),
229
                executor_kwargs=resource_dict,
230
                spawner=FluxPythonSpawner,
231
            )
232
        else:
233
            return InteractiveStepExecutor(
×
234
                max_cores=max_cores,
235
                max_workers=max_workers,
236
                executor_kwargs=resource_dict,
237
                spawner=FluxPythonSpawner,
238
            )
239
    elif backend == "slurm_allocation":
1✔
240
        check_executor(executor=flux_executor)
×
241
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
NEW
242
        check_flux_log_files(flux_log_files=flux_log_files)
×
243
        if block_allocation:
×
244
            resource_dict["init_function"] = init_function
×
UNCOV
245
            return InteractiveExecutor(
×
246
                max_workers=validate_number_of_cores(
247
                    max_cores=max_cores,
248
                    max_workers=max_workers,
249
                    cores_per_worker=cores_per_worker,
250
                    set_local_cores=False,
251
                ),
252
                executor_kwargs=resource_dict,
253
                spawner=SrunSpawner,
254
            )
255
        else:
UNCOV
256
            return InteractiveStepExecutor(
×
257
                max_cores=max_cores,
258
                max_workers=max_workers,
259
                executor_kwargs=resource_dict,
260
                spawner=SrunSpawner,
261
            )
262
    elif backend == "local":
1✔
263
        check_executor(executor=flux_executor)
1✔
264
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
265
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
266
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
267
        check_command_line_argument_lst(
1✔
268
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
269
        )
270
        del resource_dict["threads_per_core"]
1✔
271
        del resource_dict["gpus_per_core"]
1✔
272
        del resource_dict["slurm_cmd_args"]
1✔
273
        if block_allocation:
1✔
274
            resource_dict["init_function"] = init_function
1✔
275
            return InteractiveExecutor(
1✔
276
                max_workers=validate_number_of_cores(
277
                    max_cores=max_cores,
278
                    max_workers=max_workers,
279
                    cores_per_worker=cores_per_worker,
280
                    set_local_cores=True,
281
                ),
282
                executor_kwargs=resource_dict,
283
                spawner=MpiExecSpawner,
284
            )
285
        else:
286
            return InteractiveStepExecutor(
1✔
287
                max_cores=max_cores,
288
                max_workers=max_workers,
289
                executor_kwargs=resource_dict,
290
                spawner=MpiExecSpawner,
291
            )
292
    else:
293
        raise ValueError(
1✔
294
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
295
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc