• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13091277497

01 Feb 2025 06:25PM UTC coverage: 96.085% (-0.5%) from 96.536%
13091277497

Pull #548

github

web-flow
Merge 5072318f8 into 2a5c10963
Pull Request #548: [major] Refactor the Executor interface

124 of 137 new or added lines in 4 files covered. (90.51%)

1080 of 1124 relevant lines covered (96.09%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.14
/executorlib/interfaces/single.py
1
from typing import Callable, Optional, Union
1✔
2

3
from executorlib.interactive.executor import ExecutorWithDependencies
1✔
4
from executorlib.interactive.shared import (
1✔
5
    InteractiveExecutor,
6
    InteractiveStepExecutor,
7
)
8
from executorlib.standalone.inputcheck import (
1✔
9
    check_command_line_argument_lst,
10
    check_gpus_per_worker,
11
    check_init_function,
12
    check_plot_dependency_graph,
13
    check_refresh_rate,
14
    validate_number_of_cores,
15
)
16
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
17

18

19
class SingleNodeExecutor:
1✔
20
    """
21
    The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
22
    preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
23
    the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not
24
    require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly
25
    in an interactive Jupyter notebook.
26

27
    Args:
28
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
29
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
30
                           recommended, as computers have a limited number of compute cores.
31
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
32
        max_cores (int): defines the number cores which can be used in parallel
33
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
34
                              - cores (int): number of MPI cores to be used for each function call
35
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
36
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
37
                              - cwd (str/None): current working directory where the parallel python task is executed
38
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
39
                                                              SLURM only) - default False
40
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
41
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
42
                                      context of an HPC cluster this essential to be able to communicate to an
43
                                      Executor running on a different compute node within the same allocation. And
44
                                      in principle any computer should be able to resolve that their own hostname
45
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
46
                                      this look up for security reasons. So on MacOS it is required to set this
47
                                      option to true
48
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
49
                                    requirements, executorlib supports block allocation. In this case all resources have
50
                                    to be defined on the executor, rather than during the submission of the individual
51
                                    function.
52
        init_function (None): optional function to preset arguments for functions which are submitted later
53
        disable_dependencies (boolean): Disable resolving future objects during the submission.
54
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
55
        plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
56
                                      debugging purposes and to get an overview of the specified dependencies.
57
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
58

59
    Examples:
60
        ```
61
        >>> import numpy as np
62
        >>> from executorlib.interfaces.local import SingleNodeExecutor
63
        >>>
64
        >>> def calc(i, j, k):
65
        >>>     from mpi4py import MPI
66
        >>>     size = MPI.COMM_WORLD.Get_size()
67
        >>>     rank = MPI.COMM_WORLD.Get_rank()
68
        >>>     return np.array([i, j, k]), size, rank
69
        >>>
70
        >>> def init_k():
71
        >>>     return {"k": 3}
72
        >>>
73
        >>> with SingleNodeExecutor(cores=2, init_function=init_k) as p:
74
        >>>     fs = p.submit(calc, 2, j=4)
75
        >>>     print(fs.result())
76
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
77
        ```
78
    """
79

80
    def __init__(
1✔
81
        self,
82
        max_workers: Optional[int] = None,
83
        cache_directory: Optional[str] = None,
84
        max_cores: Optional[int] = None,
85
        resource_dict: Optional[dict] = None,
86
        hostname_localhost: Optional[bool] = None,
87
        block_allocation: bool = False,
88
        init_function: Optional[Callable] = None,
89
        disable_dependencies: bool = False,
90
        refresh_rate: float = 0.01,
91
        plot_dependency_graph: bool = False,
92
        plot_dependency_graph_filename: Optional[str] = None,
93
    ):
94
        # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
NEW
95
        pass
×
96

97
    def __new__(
1✔
98
        cls,
99
        max_workers: Optional[int] = None,
100
        cache_directory: Optional[str] = None,
101
        max_cores: Optional[int] = None,
102
        resource_dict: Optional[dict] = None,
103
        hostname_localhost: Optional[bool] = None,
104
        block_allocation: bool = False,
105
        init_function: Optional[Callable] = None,
106
        disable_dependencies: bool = False,
107
        refresh_rate: float = 0.01,
108
        plot_dependency_graph: bool = False,
109
        plot_dependency_graph_filename: Optional[str] = None,
110
    ):
111
        """
112
        Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
113
        executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
114
        executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
115
        for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be
116
        installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
117
        requires the SLURM workload manager to be installed on the system.
118

119
        Args:
120
            max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
121
                               number of cores which can be used in parallel - just like the max_cores parameter. Using
122
                               max_cores is recommended, as computers have a limited number of compute cores.
123
            cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
124
            max_cores (int): defines the number cores which can be used in parallel
125
            resource_dict (dict): A dictionary of resources required by the task. With the following keys:
126
                                  - cores (int): number of MPI cores to be used for each function call
127
                                  - threads_per_core (int): number of OpenMP threads to be used for each function call
128
                                  - gpus_per_core (int): number of GPUs per worker - defaults to 0
129
                                  - cwd (str/None): current working directory where the parallel python task is executed
130
                                  - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
131
                                                                  and SLURM only) - default False
132
                                  - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
133
                                                           only)
134
            hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
135
                                      context of an HPC cluster this essential to be able to communicate to an
136
                                      Executor running on a different compute node within the same allocation. And
137
                                      in principle any computer should be able to resolve that their own hostname
138
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
139
                                      this look up for security reasons. So on MacOS it is required to set this
140
                                      option to true
141
            block_allocation (boolean): To accelerate the submission of a series of python functions with the same
142
                                        resource requirements, executorlib supports block allocation. In this case all
143
                                        resources have to be defined on the executor, rather than during the submission
144
                                        of the individual function.
145
            init_function (None): optional function to preset arguments for functions which are submitted later
146
            disable_dependencies (boolean): Disable resolving future objects during the submission.
147
            refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
148
            plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
149
                                          debugging purposes and to get an overview of the specified dependencies.
150
            plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
151

152
        """
153
        default_resource_dict: dict = {
1✔
154
            "cores": 1,
155
            "threads_per_core": 1,
156
            "gpus_per_core": 0,
157
            "cwd": None,
158
            "openmpi_oversubscribe": False,
159
            "slurm_cmd_args": [],
160
        }
161
        if resource_dict is None:
1✔
162
            resource_dict = {}
1✔
163
        resource_dict.update(
1✔
164
            {k: v for k, v in default_resource_dict.items() if k not in resource_dict}
165
        )
166
        if not disable_dependencies:
1✔
167
            return ExecutorWithDependencies(
1✔
168
                executor=create_local_executor(
169
                    max_workers=max_workers,
170
                    cache_directory=cache_directory,
171
                    max_cores=max_cores,
172
                    resource_dict=resource_dict,
173
                    hostname_localhost=hostname_localhost,
174
                    block_allocation=block_allocation,
175
                    init_function=init_function,
176
                ),
177
                max_cores=max_cores,
178
                refresh_rate=refresh_rate,
179
                plot_dependency_graph=plot_dependency_graph,
180
                plot_dependency_graph_filename=plot_dependency_graph_filename,
181
            )
182
        else:
183
            check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
1✔
184
            check_refresh_rate(refresh_rate=refresh_rate)
1✔
185
            return create_local_executor(
1✔
186
                max_workers=max_workers,
187
                cache_directory=cache_directory,
188
                max_cores=max_cores,
189
                resource_dict=resource_dict,
190
                hostname_localhost=hostname_localhost,
191
                block_allocation=block_allocation,
192
                init_function=init_function,
193
            )
194

195

196
def create_local_executor(
1✔
197
    max_workers: Optional[int] = None,
198
    max_cores: Optional[int] = None,
199
    cache_directory: Optional[str] = None,
200
    resource_dict: dict = {},
201
    hostname_localhost: Optional[bool] = None,
202
    block_allocation: bool = False,
203
    init_function: Optional[Callable] = None,
204
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
205
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
206
    cores_per_worker = resource_dict.get("cores", 1)
1✔
207
    resource_dict["cache_directory"] = cache_directory
1✔
208
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
209

210
    check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
1✔
211
    check_command_line_argument_lst(
1✔
212
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
213
    )
214
    if "threads_per_core" in resource_dict.keys():
1✔
215
        del resource_dict["threads_per_core"]
1✔
216
    if "gpus_per_core" in resource_dict.keys():
1✔
217
        del resource_dict["gpus_per_core"]
1✔
218
    if "slurm_cmd_args" in resource_dict.keys():
1✔
219
        del resource_dict["slurm_cmd_args"]
1✔
220
    if block_allocation:
1✔
221
        resource_dict["init_function"] = init_function
1✔
222
        return InteractiveExecutor(
1✔
223
            max_workers=validate_number_of_cores(
224
                max_cores=max_cores,
225
                max_workers=max_workers,
226
                cores_per_worker=cores_per_worker,
227
                set_local_cores=True,
228
            ),
229
            executor_kwargs=resource_dict,
230
            spawner=MpiExecSpawner,
231
        )
232
    else:
233
        return InteractiveStepExecutor(
1✔
234
            max_cores=max_cores,
235
            max_workers=max_workers,
236
            executor_kwargs=resource_dict,
237
            spawner=MpiExecSpawner,
238
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc