• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12308492040

13 Dec 2024 02:32AM UTC coverage: 95.286% (-0.4%) from 95.639%
12308492040

Pull #519

github

web-flow
Merge 48a8e680f into f1c4ffa9c
Pull Request #519: Add option to write flux log files

7 of 11 new or added lines in 4 files covered. (63.64%)

6 existing lines in 3 files now uncovered.

950 of 997 relevant lines covered (95.29%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.9
/executorlib/interactive/executor.py
1
from concurrent.futures import Future
1✔
2
from typing import Any, Callable, Dict, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.interactive.shared import (
1✔
6
    InteractiveExecutor,
7
    InteractiveStepExecutor,
8
    execute_tasks_with_dependencies,
9
)
10
from executorlib.standalone.inputcheck import (
1✔
11
    check_command_line_argument_lst,
12
    check_executor,
13
    check_gpus_per_worker,
14
    check_init_function,
15
    check_nested_flux_executor,
16
    check_flux_log_files,
17
    check_oversubscribe,
18
    check_pmi,
19
    validate_number_of_cores,
20
)
21
from executorlib.standalone.interactive.spawner import (
1✔
22
    MpiExecSpawner,
23
    SrunSpawner,
24
)
25
from executorlib.standalone.plot import (
1✔
26
    draw,
27
    generate_nodes_and_edges,
28
    generate_task_hash,
29
)
30
from executorlib.standalone.thread import RaisingThread
1✔
31

32
try:  # The PyFluxExecutor requires flux-base to be installed.
1✔
33
    from executorlib.interactive.flux import FluxPythonSpawner
1✔
34
except ImportError:
×
35
    pass
×
36

37

38
class ExecutorWithDependencies(ExecutorBase):
1✔
39
    """
40
    ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
41
    dependencies.
42

43
    Args:
44
        refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
45
        plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
46
        *args: Variable length argument list.
47
        **kwargs: Arbitrary keyword arguments.
48

49
    Attributes:
50
        _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
51
        _task_hash_dict (Dict[str, Dict]): A dictionary mapping task hash to task dictionary.
52
        _generate_dependency_graph (bool): Whether to generate the dependency graph.
53

54
    """
55

56
    def __init__(
1✔
57
        self,
58
        *args: Any,
59
        refresh_rate: float = 0.01,
60
        plot_dependency_graph: bool = False,
61
        **kwargs: Any,
62
    ) -> None:
63
        super().__init__(max_cores=kwargs.get("max_cores", None))
1✔
64
        executor = create_executor(*args, **kwargs)
1✔
65
        self._set_process(
1✔
66
            RaisingThread(
67
                target=execute_tasks_with_dependencies,
68
                kwargs={
69
                    # Executor Arguments
70
                    "future_queue": self._future_queue,
71
                    "executor_queue": executor._future_queue,
72
                    "executor": executor,
73
                    "refresh_rate": refresh_rate,
74
                },
75
            )
76
        )
77
        self._future_hash_dict = {}
1✔
78
        self._task_hash_dict = {}
1✔
79
        self._generate_dependency_graph = plot_dependency_graph
1✔
80

81
    def submit(
1✔
82
        self,
83
        fn: Callable[..., Any],
84
        *args: Any,
85
        resource_dict: Dict[str, Any] = {},
86
        **kwargs: Any,
87
    ) -> Future:
88
        """
89
        Submits a task to the executor.
90

91
        Args:
92
            fn (callable): The function to be executed.
93
            *args: Variable length argument list.
94
            resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}.
95
            **kwargs: Arbitrary keyword arguments.
96

97
        Returns:
98
            Future: A future object representing the result of the task.
99

100
        """
101
        if not self._generate_dependency_graph:
1✔
102
            f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
1✔
103
        else:
104
            f = Future()
1✔
105
            f.set_result(None)
1✔
106
            task_dict = {
1✔
107
                "fn": fn,
108
                "args": args,
109
                "kwargs": kwargs,
110
                "future": f,
111
                "resource_dict": resource_dict,
112
            }
113
            task_hash = generate_task_hash(
1✔
114
                task_dict=task_dict,
115
                future_hash_inverse_dict={
116
                    v: k for k, v in self._future_hash_dict.items()
117
                },
118
            )
119
            self._future_hash_dict[task_hash] = f
1✔
120
            self._task_hash_dict[task_hash] = task_dict
1✔
121
        return f
1✔
122

123
    def __exit__(
1✔
124
        self,
125
        exc_type: Any,
126
        exc_val: Any,
127
        exc_tb: Any,
128
    ) -> None:
129
        """
130
        Exit method called when exiting the context manager.
131

132
        Args:
133
            exc_type: The type of the exception.
134
            exc_val: The exception instance.
135
            exc_tb: The traceback object.
136

137
        """
138
        super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
1✔
139
        if self._generate_dependency_graph:
1✔
140
            node_lst, edge_lst = generate_nodes_and_edges(
1✔
141
                task_hash_dict=self._task_hash_dict,
142
                future_hash_inverse_dict={
143
                    v: k for k, v in self._future_hash_dict.items()
144
                },
145
            )
146
            return draw(node_lst=node_lst, edge_lst=edge_lst)
1✔
147

148

149
def create_executor(
1✔
150
    max_workers: Optional[int] = None,
151
    backend: str = "local",
152
    max_cores: Optional[int] = None,
153
    cache_directory: Optional[str] = None,
154
    resource_dict: Optional[dict] = None,
155
    flux_executor=None,
156
    flux_executor_pmi_mode: Optional[str] = None,
157
    flux_executor_nesting: bool = False,
158
    flux_log_files: bool = False,
159
    hostname_localhost: Optional[bool] = None,
160
    block_allocation: bool = False,
161
    init_function: Optional[callable] = None,
162
):
163
    """
164
    Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
165
    executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
166
    executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
167
    for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be
168
    installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
169
    requires the SLURM workload manager to be installed on the system.
170

171
    Args:
172
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
173
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
174
                           recommended, as computers have a limited number of compute cores.
175
        backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
176
        max_cores (int): defines the number cores which can be used in parallel
177
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
178
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
179
                              - cores (int): number of MPI cores to be used for each function call
180
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
181
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
182
                              - cwd (str/None): current working directory where the parallel python task is executed
183
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
184
                                                              SLURM only) - default False
185
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
186
        flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
187
        flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
188
        flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
189
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
190
                                      context of an HPC cluster this essential to be able to communicate to an Executor
191
                                      running on a different compute node within the same allocation. And in principle
192
                                      any computer should be able to resolve that their own hostname points to the same
193
                                      address as localhost. Still MacOS >= 12 seems to disable this look up for security
194
                                      reasons. So on MacOS it is required to set this option to true
195
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
196
                                    resource requirements, executorlib supports block allocation. In this case all
197
                                    resources have to be defined on the executor, rather than during the submission
198
                                    of the individual function.
199
        init_function (None): optional function to preset arguments for functions which are submitted later
200
    """
201
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
202
    if flux_executor is not None and backend != "flux_allocation":
1✔
UNCOV
203
        backend = "flux_allocation"
×
204
    check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
1✔
205
    cores_per_worker = resource_dict["cores"]
1✔
206
    resource_dict["cache_directory"] = cache_directory
1✔
207
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
208
    if backend == "flux_allocation":
1✔
209
        check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
1✔
210
        check_command_line_argument_lst(
1✔
211
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
212
        )
213
        del resource_dict["openmpi_oversubscribe"]
1✔
214
        del resource_dict["slurm_cmd_args"]
1✔
215
        resource_dict["flux_executor"] = flux_executor
1✔
216
        resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
1✔
217
        resource_dict["flux_executor_nesting"] = flux_executor_nesting
1✔
218
        resource_dict["flux_log_files"] = flux_log_files
1✔
219
        if block_allocation:
1✔
220
            resource_dict["init_function"] = init_function
1✔
221
            return InteractiveExecutor(
1✔
222
                max_workers=validate_number_of_cores(
223
                    max_cores=max_cores,
224
                    max_workers=max_workers,
225
                    cores_per_worker=cores_per_worker,
226
                    set_local_cores=False,
227
                ),
228
                executor_kwargs=resource_dict,
229
                spawner=FluxPythonSpawner,
230
            )
231
        else:
UNCOV
232
            return InteractiveStepExecutor(
×
233
                max_cores=max_cores,
234
                max_workers=max_workers,
235
                executor_kwargs=resource_dict,
236
                spawner=FluxPythonSpawner,
237
            )
238
    elif backend == "slurm_allocation":
1✔
UNCOV
239
        check_executor(executor=flux_executor)
×
240
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
×
NEW
241
        check_flux_log_files(flux_log_files=flux_log_files)
×
242
        if block_allocation:
×
243
            resource_dict["init_function"] = init_function
×
244
            return InteractiveExecutor(
×
245
                max_workers=validate_number_of_cores(
246
                    max_cores=max_cores,
247
                    max_workers=max_workers,
248
                    cores_per_worker=cores_per_worker,
249
                    set_local_cores=False,
250
                ),
251
                executor_kwargs=resource_dict,
252
                spawner=SrunSpawner,
253
            )
254
        else:
255
            return InteractiveStepExecutor(
×
256
                max_cores=max_cores,
257
                max_workers=max_workers,
258
                executor_kwargs=resource_dict,
259
                spawner=SrunSpawner,
260
            )
261
    elif backend == "local":
1✔
262
        check_executor(executor=flux_executor)
1✔
263
        check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
264
        check_flux_log_files(flux_log_files=flux_log_files)
1✔
265
        check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
1✔
266
        check_command_line_argument_lst(
1✔
267
            command_line_argument_lst=resource_dict["slurm_cmd_args"]
268
        )
269
        del resource_dict["threads_per_core"]
1✔
270
        del resource_dict["gpus_per_core"]
1✔
271
        del resource_dict["slurm_cmd_args"]
1✔
272
        if block_allocation:
1✔
273
            resource_dict["init_function"] = init_function
1✔
274
            return InteractiveExecutor(
1✔
275
                max_workers=validate_number_of_cores(
276
                    max_cores=max_cores,
277
                    max_workers=max_workers,
278
                    cores_per_worker=cores_per_worker,
279
                    set_local_cores=True,
280
                ),
281
                executor_kwargs=resource_dict,
282
                spawner=MpiExecSpawner,
283
            )
284
        else:
285
            return InteractiveStepExecutor(
1✔
286
                max_cores=max_cores,
287
                max_workers=max_workers,
288
                executor_kwargs=resource_dict,
289
                spawner=MpiExecSpawner,
290
            )
291
    else:
292
        raise ValueError(
1✔
293
            "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
294
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc