• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13121243729

03 Feb 2025 07:24PM UTC coverage: 95.735% (-0.3%) from 95.996%
13121243729

Pull #548

github

web-flow
Merge 0cf4b6a75 into 11d44cc6f
Pull Request #548: [major] Refactor the Executor interface

127 of 143 new or added lines in 4 files covered. (88.81%)

1100 of 1149 relevant lines covered (95.74%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/executorlib/interfaces/single.py
1
from typing import Callable, Optional, Union
1✔
2

3
from executorlib.interactive.executor import ExecutorWithDependencies
1✔
4
from executorlib.interactive.shared import (
1✔
5
    InteractiveExecutor,
6
    InteractiveStepExecutor,
7
)
8
from executorlib.standalone.inputcheck import (
1✔
9
    check_command_line_argument_lst,
10
    check_gpus_per_worker,
11
    check_init_function,
12
    check_plot_dependency_graph,
13
    check_refresh_rate,
14
    validate_number_of_cores,
15
)
16
from executorlib.standalone.interactive.spawner import MpiExecSpawner
1✔
17

18

19
class SingleNodeExecutor:
1✔
20
    """
21
    The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
22
    preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
23
    the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not
24
    require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly
25
    in an interactive Jupyter notebook.
26

27
    Args:
28
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
29
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
30
                           recommended, as computers have a limited number of compute cores.
31
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
32
        max_cores (int): defines the number cores which can be used in parallel
33
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
34
                              - cores (int): number of MPI cores to be used for each function call
35
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
36
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
37
                              - cwd (str/None): current working directory where the parallel python task is executed
38
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
39
                                                              SLURM only) - default False
40
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
41
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
42
                                      context of an HPC cluster this essential to be able to communicate to an
43
                                      Executor running on a different compute node within the same allocation. And
44
                                      in principle any computer should be able to resolve that their own hostname
45
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
46
                                      this look up for security reasons. So on MacOS it is required to set this
47
                                      option to true
48
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
49
                                    requirements, executorlib supports block allocation. In this case all resources have
50
                                    to be defined on the executor, rather than during the submission of the individual
51
                                    function.
52
        init_function (None): optional function to preset arguments for functions which are submitted later
53
        disable_dependencies (boolean): Disable resolving future objects during the submission.
54
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
55
        plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
56
                                      debugging purposes and to get an overview of the specified dependencies.
57
        plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
58

59
    Examples:
60
        ```
61
        >>> import numpy as np
62
        >>> from executorlib.interfaces.single import SingleNodeExecutor
63
        >>>
64
        >>> def calc(i, j, k):
65
        >>>     from mpi4py import MPI
66
        >>>     size = MPI.COMM_WORLD.Get_size()
67
        >>>     rank = MPI.COMM_WORLD.Get_rank()
68
        >>>     return np.array([i, j, k]), size, rank
69
        >>>
70
        >>> def init_k():
71
        >>>     return {"k": 3}
72
        >>>
73
        >>> with SingleNodeExecutor(max_workers=2, init_function=init_k) as p:
74
        >>>     fs = p.submit(calc, 2, j=4)
75
        >>>     print(fs.result())
76
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
77
        ```
78
    """
79

80
    def __init__(
1✔
81
        self,
82
        max_workers: Optional[int] = None,
83
        cache_directory: Optional[str] = None,
84
        max_cores: Optional[int] = None,
85
        resource_dict: Optional[dict] = None,
86
        hostname_localhost: Optional[bool] = None,
87
        block_allocation: bool = False,
88
        init_function: Optional[Callable] = None,
89
        disable_dependencies: bool = False,
90
        refresh_rate: float = 0.01,
91
        plot_dependency_graph: bool = False,
92
        plot_dependency_graph_filename: Optional[str] = None,
93
    ):
94
        # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
NEW
95
        pass
×
96

97
    def __new__(
1✔
98
        cls,
99
        max_workers: Optional[int] = None,
100
        cache_directory: Optional[str] = None,
101
        max_cores: Optional[int] = None,
102
        resource_dict: Optional[dict] = None,
103
        hostname_localhost: Optional[bool] = None,
104
        block_allocation: bool = False,
105
        init_function: Optional[Callable] = None,
106
        disable_dependencies: bool = False,
107
        refresh_rate: float = 0.01,
108
        plot_dependency_graph: bool = False,
109
        plot_dependency_graph_filename: Optional[str] = None,
110
    ):
111
        """
112
        Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
113
        executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
114
        executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
115
        for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be
116
        installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
117
        requires the SLURM workload manager to be installed on the system.
118

119
        Args:
120
            max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
121
                               number of cores which can be used in parallel - just like the max_cores parameter. Using
122
                               max_cores is recommended, as computers have a limited number of compute cores.
123
            cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
124
            max_cores (int): defines the number cores which can be used in parallel
125
            resource_dict (dict): A dictionary of resources required by the task. With the following keys:
126
                                  - cores (int): number of MPI cores to be used for each function call
127
                                  - threads_per_core (int): number of OpenMP threads to be used for each function call
128
                                  - gpus_per_core (int): number of GPUs per worker - defaults to 0
129
                                  - cwd (str/None): current working directory where the parallel python task is executed
130
                                  - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
131
                                                                  and SLURM only) - default False
132
                                  - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
133
                                                           only)
134
            hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
135
                                      context of an HPC cluster this essential to be able to communicate to an
136
                                      Executor running on a different compute node within the same allocation. And
137
                                      in principle any computer should be able to resolve that their own hostname
138
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
139
                                      this look up for security reasons. So on MacOS it is required to set this
140
                                      option to true
141
            block_allocation (boolean): To accelerate the submission of a series of python functions with the same
142
                                        resource requirements, executorlib supports block allocation. In this case all
143
                                        resources have to be defined on the executor, rather than during the submission
144
                                        of the individual function.
145
            init_function (None): optional function to preset arguments for functions which are submitted later
146
            disable_dependencies (boolean): Disable resolving future objects during the submission.
147
            refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
148
            plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
149
                                          debugging purposes and to get an overview of the specified dependencies.
150
            plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
151

152
        """
153
        default_resource_dict: dict = {
1✔
154
            "cores": 1,
155
            "threads_per_core": 1,
156
            "gpus_per_core": 0,
157
            "cwd": None,
158
            "openmpi_oversubscribe": False,
159
            "slurm_cmd_args": [],
160
        }
161
        if resource_dict is None:
1✔
162
            resource_dict = {}
1✔
163
        resource_dict.update(
1✔
164
            {k: v for k, v in default_resource_dict.items() if k not in resource_dict}
165
        )
166
        if not disable_dependencies:
1✔
167
            return ExecutorWithDependencies(
1✔
168
                executor=create_single_node_executor(
169
                    max_workers=max_workers,
170
                    cache_directory=cache_directory,
171
                    max_cores=max_cores,
172
                    resource_dict=resource_dict,
173
                    hostname_localhost=hostname_localhost,
174
                    block_allocation=block_allocation,
175
                    init_function=init_function,
176
                ),
177
                max_cores=max_cores,
178
                refresh_rate=refresh_rate,
179
                plot_dependency_graph=plot_dependency_graph,
180
                plot_dependency_graph_filename=plot_dependency_graph_filename,
181
            )
182
        else:
183
            check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
1✔
184
            check_refresh_rate(refresh_rate=refresh_rate)
1✔
185
            return create_single_node_executor(
1✔
186
                max_workers=max_workers,
187
                cache_directory=cache_directory,
188
                max_cores=max_cores,
189
                resource_dict=resource_dict,
190
                hostname_localhost=hostname_localhost,
191
                block_allocation=block_allocation,
192
                init_function=init_function,
193
            )
194

195

196
def create_single_node_executor(
1✔
197
    max_workers: Optional[int] = None,
198
    max_cores: Optional[int] = None,
199
    cache_directory: Optional[str] = None,
200
    resource_dict: Optional[dict] = None,
201
    hostname_localhost: Optional[bool] = None,
202
    block_allocation: bool = False,
203
    init_function: Optional[Callable] = None,
204
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
205
    """
206
    Create a single node executor
207

208
    Args:
209
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
210
                           number of cores which can be used in parallel - just like the max_cores parameter. Using
211
                           max_cores is recommended, as computers have a limited number of compute cores.
212
        max_cores (int): defines the number cores which can be used in parallel
213
        cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
214
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
215
                              - cores (int): number of MPI cores to be used for each function call
216
                              - threads_per_core (int): number of OpenMP threads to be used for each function call
217
                              - gpus_per_core (int): number of GPUs per worker - defaults to 0
218
                              - cwd (str/None): current working directory where the parallel python task is executed
219
                              - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
220
                                                              and SLURM only) - default False
221
                              - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
222
                                                       only)
223
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
224
                                  context of an HPC cluster this essential to be able to communicate to an
225
                                  Executor running on a different compute node within the same allocation. And
226
                                  in principle any computer should be able to resolve that their own hostname
227
                                  points to the same address as localhost. Still MacOS >= 12 seems to disable
228
                                  this look up for security reasons. So on MacOS it is required to set this
229
                                  option to true
230
        block_allocation (boolean): To accelerate the submission of a series of python functions with the same
231
                                    resource requirements, executorlib supports block allocation. In this case all
232
                                    resources have to be defined on the executor, rather than during the submission
233
                                    of the individual function.
234
        init_function (None): optional function to preset arguments for functions which are submitted later
235

236
    Returns:
237
        InteractiveStepExecutor/ InteractiveExecutor
238
    """
239
    if resource_dict is None:
1✔
NEW
240
        resource_dict = {}
×
241
    cores_per_worker = resource_dict.get("cores", 1)
1✔
242
    resource_dict["cache_directory"] = cache_directory
1✔
243
    resource_dict["hostname_localhost"] = hostname_localhost
1✔
244

245
    check_init_function(block_allocation=block_allocation, init_function=init_function)
1✔
246
    check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
1✔
247
    check_command_line_argument_lst(
1✔
248
        command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
249
    )
250
    if "threads_per_core" in resource_dict:
1✔
251
        del resource_dict["threads_per_core"]
1✔
252
    if "gpus_per_core" in resource_dict:
1✔
253
        del resource_dict["gpus_per_core"]
1✔
254
    if "slurm_cmd_args" in resource_dict:
1✔
255
        del resource_dict["slurm_cmd_args"]
1✔
256
    if block_allocation:
1✔
257
        resource_dict["init_function"] = init_function
1✔
258
        return InteractiveExecutor(
1✔
259
            max_workers=validate_number_of_cores(
260
                max_cores=max_cores,
261
                max_workers=max_workers,
262
                cores_per_worker=cores_per_worker,
263
                set_local_cores=True,
264
            ),
265
            executor_kwargs=resource_dict,
266
            spawner=MpiExecSpawner,
267
        )
268
    else:
269
        return InteractiveStepExecutor(
1✔
270
            max_cores=max_cores,
271
            max_workers=max_workers,
272
            executor_kwargs=resource_dict,
273
            spawner=MpiExecSpawner,
274
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc