• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12425281064

20 Dec 2024 04:00AM UTC coverage: 94.253% (-1.5%) from 95.78%
12425281064

Pull #528

github

web-flow
Merge 0b9ee350a into 44214f755
Pull Request #528: Move SLURM to separate module

26 of 27 new or added lines in 3 files covered. (96.3%)

16 existing lines in 1 file now uncovered.

984 of 1044 relevant lines covered (94.25%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.23
/executorlib/standalone/interactive/spawner.py
1
import subprocess
1✔
2
from abc import ABC
1✔
3
from typing import Optional
1✔
4

5
MPI_COMMAND = "mpiexec"
1✔
6
SLURM_COMMAND = "srun"
1✔
7

8

9
class BaseSpawner(ABC):
1✔
10
    def __init__(self, cwd: str, cores: int = 1, openmpi_oversubscribe: bool = False):
1✔
11
        """
12
        Base class for interface implementations.
13

14
        Args:
15
            cwd (str): The current working directory.
16
            cores (int, optional): The number of cores to use. Defaults to 1.
17
            openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
18
        """
19
        self._cwd = cwd
1✔
20
        self._cores = cores
1✔
21
        self._openmpi_oversubscribe = openmpi_oversubscribe
1✔
22

23
    def bootup(
1✔
24
        self,
25
        command_lst: list[str],
26
    ):
27
        """
28
        Method to start the interface.
29

30
        Args:
31
            command_lst (list[str]): The command list to execute.
32
        """
33
        raise NotImplementedError
×
34

35
    def shutdown(self, wait: bool = True):
1✔
36
        """
37
        Method to shutdown the interface.
38

39
        Args:
40
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
41
        """
42
        raise NotImplementedError
×
43

44
    def poll(self):
1✔
45
        """
46
        Method to check if the interface is running.
47

48
        Returns:
49
            bool: True if the interface is running, False otherwise.
50
        """
51
        raise NotImplementedError
×
52

53

54
class SubprocessSpawner(BaseSpawner):
1✔
55
    def __init__(
1✔
56
        self,
57
        cwd: Optional[str] = None,
58
        cores: int = 1,
59
        openmpi_oversubscribe: bool = False,
60
        threads_per_core: int = 1,
61
    ):
62
        """
63
        Subprocess interface implementation.
64

65
        Args:
66
            cwd (str, optional): The current working directory. Defaults to None.
67
            cores (int, optional): The number of cores to use. Defaults to 1.
68
            threads_per_core (int, optional): The number of threads per core. Defaults to 1.
69
            oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
70
        """
71
        super().__init__(
1✔
72
            cwd=cwd,
73
            cores=cores,
74
            openmpi_oversubscribe=openmpi_oversubscribe,
75
        )
76
        self._process = None
1✔
77
        self._threads_per_core = threads_per_core
1✔
78

79
    def bootup(
1✔
80
        self,
81
        command_lst: list[str],
82
    ):
83
        """
84
        Method to start the subprocess interface.
85

86
        Args:
87
            command_lst (list[str]): The command list to execute.
88
        """
89
        self._process = subprocess.Popen(
1✔
90
            args=self.generate_command(command_lst=command_lst),
91
            cwd=self._cwd,
92
            stdin=subprocess.DEVNULL,
93
        )
94

95
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
96
        """
97
        Method to generate the command list.
98

99
        Args:
100
            command_lst (list[str]): The command list.
101

102
        Returns:
103
            list[str]: The generated command list.
104
        """
105
        return command_lst
1✔
106

107
    def shutdown(self, wait: bool = True):
1✔
108
        """
109
        Method to shutdown the subprocess interface.
110

111
        Args:
112
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
113
        """
114
        self._process.communicate()
1✔
115
        self._process.terminate()
1✔
116
        if wait:
1✔
117
            self._process.wait()
1✔
118
        self._process = None
1✔
119

120
    def poll(self) -> bool:
1✔
121
        """
122
        Method to check if the subprocess interface is running.
123

124
        Returns:
125
            bool: True if the interface is running, False otherwise.
126
        """
127
        return self._process is not None and self._process.poll() is None
1✔
128

129

130
class MpiExecSpawner(SubprocessSpawner):
1✔
131
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
132
        """
133
        Generate the command list for the MPIExec interface.
134

135
        Args:
136
            command_lst (list[str]): The command list.
137

138
        Returns:
139
            list[str]: The generated command list.
140
        """
141
        command_prepend_lst = generate_mpiexec_command(
1✔
142
            cores=self._cores,
143
            openmpi_oversubscribe=self._openmpi_oversubscribe,
144
        )
145
        return super().generate_command(
1✔
146
            command_lst=command_prepend_lst + command_lst,
147
        )
148

149

150
class SrunSpawner(SubprocessSpawner):
1✔
151
    def __init__(
1✔
152
        self,
153
        cwd: Optional[str] = None,
154
        cores: int = 1,
155
        threads_per_core: int = 1,
156
        gpus_per_core: int = 0,
157
        openmpi_oversubscribe: bool = False,
158
        slurm_cmd_args: list[str] = [],
159
    ):
160
        """
161
        Srun interface implementation.
162

163
        Args:
164
            cwd (str, optional): The current working directory. Defaults to None.
165
            cores (int, optional): The number of cores to use. Defaults to 1.
166
            threads_per_core (int, optional): The number of threads per core. Defaults to 1.
167
            gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
168
            openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
169
            slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
170
        """
UNCOV
171
        super().__init__(
×
172
            cwd=cwd,
173
            cores=cores,
174
            openmpi_oversubscribe=openmpi_oversubscribe,
175
            threads_per_core=threads_per_core,
176
        )
UNCOV
177
        self._gpus_per_core = gpus_per_core
×
UNCOV
178
        self._slurm_cmd_args = slurm_cmd_args
×
179

180
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
181
        """
182
        Generate the command list for the Srun interface.
183

184
        Args:
185
            command_lst (list[str]): The command list.
186

187
        Returns:
188
            list[str]: The generated command list.
189
        """
UNCOV
190
        command_prepend_lst = generate_slurm_command(
×
191
            cores=self._cores,
192
            cwd=self._cwd,
193
            threads_per_core=self._threads_per_core,
194
            gpus_per_core=self._gpus_per_core,
195
            openmpi_oversubscribe=self._openmpi_oversubscribe,
196
            slurm_cmd_args=self._slurm_cmd_args,
197
        )
UNCOV
198
        return super().generate_command(
×
199
            command_lst=command_prepend_lst + command_lst,
200
        )
201

202

203
def generate_mpiexec_command(
1✔
204
    cores: int, openmpi_oversubscribe: bool = False
205
) -> list[str]:
206
    """
207
    Generate the command list for the MPIExec interface.
208

209
    Args:
210
        cores (int): The number of cores.
211
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
212

213
    Returns:
214
        list[str]: The generated command list.
215
    """
216
    if cores == 1:
1✔
217
        return []
1✔
218
    else:
219
        command_prepend_lst = [MPI_COMMAND, "-n", str(cores)]
1✔
220
        if openmpi_oversubscribe:
1✔
221
            command_prepend_lst += ["--oversubscribe"]
1✔
222
        return command_prepend_lst
1✔
223

224

225
def generate_slurm_command(
1✔
226
    cores: int,
227
    cwd: str,
228
    threads_per_core: int = 1,
229
    gpus_per_core: int = 0,
230
    openmpi_oversubscribe: bool = False,
231
    slurm_cmd_args: list[str] = [],
232
) -> list[str]:
233
    """
234
    Generate the command list for the SLURM interface.
235

236
    Args:
237
        cores (int): The number of cores.
238
        cwd (str): The current working directory.
239
        threads_per_core (int, optional): The number of threads per core. Defaults to 1.
240
        gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
241
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
242
        slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
243

244
    Returns:
245
        list[str]: The generated command list.
246
    """
UNCOV
247
    command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
×
UNCOV
248
    if cwd is not None:
×
UNCOV
249
        command_prepend_lst += ["-D", cwd]
×
UNCOV
250
    if threads_per_core > 1:
×
251
        command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
×
UNCOV
252
    if gpus_per_core > 0:
×
UNCOV
253
        command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
×
UNCOV
254
    if openmpi_oversubscribe:
×
UNCOV
255
        command_prepend_lst += ["--oversubscribe"]
×
UNCOV
256
    if len(slurm_cmd_args) > 0:
×
UNCOV
257
        command_prepend_lst += slurm_cmd_args
×
UNCOV
258
    return command_prepend_lst
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc