• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11907763573

19 Nov 2024 06:52AM UTC coverage: 95.648%. Remained the same
11907763573

push

github

web-flow
[pre-commit.ci] pre-commit autoupdate (#502)

* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.7.3 → v0.7.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.7.3...v0.7.4)

* Update environment-openmpi.yml

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jan Janssen <jan-janssen@users.noreply.github.com>

945 of 988 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.85
/executorlib/standalone/interactive/spawner.py
1
import subprocess
1✔
2
from abc import ABC
1✔
3
from typing import Optional
1✔
4

5
MPI_COMMAND = "mpiexec"
1✔
6
SLURM_COMMAND = "srun"
1✔
7

8

9
class BaseSpawner(ABC):
1✔
10
    def __init__(self, cwd: str, cores: int = 1, openmpi_oversubscribe: bool = False):
1✔
11
        """
12
        Base class for interface implementations.
13

14
        Args:
15
            cwd (str): The current working directory.
16
            cores (int, optional): The number of cores to use. Defaults to 1.
17
            openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
18
        """
19
        self._cwd = cwd
1✔
20
        self._cores = cores
1✔
21
        self._openmpi_oversubscribe = openmpi_oversubscribe
1✔
22

23
    def bootup(
1✔
24
        self,
25
        command_lst: list[str],
26
    ):
27
        """
28
        Method to start the interface.
29

30
        Args:
31
            command_lst (list[str]): The command list to execute.
32
        """
33
        raise NotImplementedError
×
34

35
    def shutdown(self, wait: bool = True):
1✔
36
        """
37
        Method to shutdown the interface.
38

39
        Args:
40
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
41
        """
42
        raise NotImplementedError
×
43

44
    def poll(self):
1✔
45
        """
46
        Method to check if the interface is running.
47

48
        Returns:
49
            bool: True if the interface is running, False otherwise.
50
        """
51
        raise NotImplementedError
×
52

53

54
class SubprocessSpawner(BaseSpawner):
1✔
55
    def __init__(
1✔
56
        self,
57
        cwd: Optional[str] = None,
58
        cores: int = 1,
59
        openmpi_oversubscribe: bool = False,
60
    ):
61
        """
62
        Subprocess interface implementation.
63

64
        Args:
65
            cwd (str, optional): The current working directory. Defaults to None.
66
            cores (int, optional): The number of cores to use. Defaults to 1.
67
            oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
68
        """
69
        super().__init__(
1✔
70
            cwd=cwd,
71
            cores=cores,
72
            openmpi_oversubscribe=openmpi_oversubscribe,
73
        )
74
        self._process = None
1✔
75

76
    def bootup(
1✔
77
        self,
78
        command_lst: list[str],
79
    ):
80
        """
81
        Method to start the subprocess interface.
82

83
        Args:
84
            command_lst (list[str]): The command list to execute.
85
        """
86
        self._process = subprocess.Popen(
1✔
87
            args=self.generate_command(command_lst=command_lst),
88
            cwd=self._cwd,
89
            stdin=subprocess.DEVNULL,
90
        )
91

92
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
93
        """
94
        Method to generate the command list.
95

96
        Args:
97
            command_lst (list[str]): The command list.
98

99
        Returns:
100
            list[str]: The generated command list.
101
        """
102
        return command_lst
1✔
103

104
    def shutdown(self, wait: bool = True):
1✔
105
        """
106
        Method to shutdown the subprocess interface.
107

108
        Args:
109
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
110
        """
111
        self._process.communicate()
1✔
112
        self._process.terminate()
1✔
113
        if wait:
1✔
114
            self._process.wait()
1✔
115
        self._process = None
1✔
116

117
    def poll(self) -> bool:
1✔
118
        """
119
        Method to check if the subprocess interface is running.
120

121
        Returns:
122
            bool: True if the interface is running, False otherwise.
123
        """
124
        return self._process is not None and self._process.poll() is None
1✔
125

126

127
class MpiExecSpawner(SubprocessSpawner):
1✔
128
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
129
        """
130
        Generate the command list for the MPIExec interface.
131

132
        Args:
133
            command_lst (list[str]): The command list.
134

135
        Returns:
136
            list[str]: The generated command list.
137
        """
138
        command_prepend_lst = generate_mpiexec_command(
1✔
139
            cores=self._cores,
140
            openmpi_oversubscribe=self._openmpi_oversubscribe,
141
        )
142
        return super().generate_command(
1✔
143
            command_lst=command_prepend_lst + command_lst,
144
        )
145

146

147
class SrunSpawner(SubprocessSpawner):
1✔
148
    def __init__(
1✔
149
        self,
150
        cwd: Optional[str] = None,
151
        cores: int = 1,
152
        threads_per_core: int = 1,
153
        gpus_per_core: int = 0,
154
        openmpi_oversubscribe: bool = False,
155
        slurm_cmd_args: list[str] = [],
156
    ):
157
        """
158
        Srun interface implementation.
159

160
        Args:
161
            cwd (str, optional): The current working directory. Defaults to None.
162
            cores (int, optional): The number of cores to use. Defaults to 1.
163
            threads_per_core (int, optional): The number of threads per core. Defaults to 1.
164
            gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
165
            openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
166
            slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
167
        """
168
        super().__init__(
1✔
169
            cwd=cwd,
170
            cores=cores,
171
            openmpi_oversubscribe=openmpi_oversubscribe,
172
        )
173
        self._threads_per_core = threads_per_core
1✔
174
        self._gpus_per_core = gpus_per_core
1✔
175
        self._slurm_cmd_args = slurm_cmd_args
1✔
176

177
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
178
        """
179
        Generate the command list for the Srun interface.
180

181
        Args:
182
            command_lst (list[str]): The command list.
183

184
        Returns:
185
            list[str]: The generated command list.
186
        """
187
        command_prepend_lst = generate_slurm_command(
1✔
188
            cores=self._cores,
189
            cwd=self._cwd,
190
            threads_per_core=self._threads_per_core,
191
            gpus_per_core=self._gpus_per_core,
192
            openmpi_oversubscribe=self._openmpi_oversubscribe,
193
            slurm_cmd_args=self._slurm_cmd_args,
194
        )
195
        return super().generate_command(
1✔
196
            command_lst=command_prepend_lst + command_lst,
197
        )
198

199

200
def generate_mpiexec_command(
1✔
201
    cores: int, openmpi_oversubscribe: bool = False
202
) -> list[str]:
203
    """
204
    Generate the command list for the MPIExec interface.
205

206
    Args:
207
        cores (int): The number of cores.
208
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
209

210
    Returns:
211
        list[str]: The generated command list.
212
    """
213
    if cores == 1:
1✔
214
        return []
1✔
215
    else:
216
        command_prepend_lst = [MPI_COMMAND, "-n", str(cores)]
1✔
217
        if openmpi_oversubscribe:
1✔
218
            command_prepend_lst += ["--oversubscribe"]
1✔
219
        return command_prepend_lst
1✔
220

221

222
def generate_slurm_command(
1✔
223
    cores: int,
224
    cwd: str,
225
    threads_per_core: int = 1,
226
    gpus_per_core: int = 0,
227
    openmpi_oversubscribe: bool = False,
228
    slurm_cmd_args: list[str] = [],
229
) -> list[str]:
230
    """
231
    Generate the command list for the SLURM interface.
232

233
    Args:
234
        cores (int): The number of cores.
235
        cwd (str): The current working directory.
236
        threads_per_core (int, optional): The number of threads per core. Defaults to 1.
237
        gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
238
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
239
        slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
240

241
    Returns:
242
        list[str]: The generated command list.
243
    """
244
    command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
1✔
245
    if cwd is not None:
1✔
246
        command_prepend_lst += ["-D", cwd]
1✔
247
    if threads_per_core > 1:
1✔
248
        command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
×
249
    if gpus_per_core > 0:
1✔
250
        command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
1✔
251
    if openmpi_oversubscribe:
1✔
252
        command_prepend_lst += ["--oversubscribe"]
1✔
253
    if len(slurm_cmd_args) > 0:
1✔
254
        command_prepend_lst += slurm_cmd_args
1✔
255
    return command_prepend_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc