• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 10230589389

03 Aug 2024 08:03PM UTC coverage: 94.363% (+0.006%) from 94.357%
10230589389

Pull #385

github

web-flow
Merge 21a0b37e6 into 4255d1c26
Pull Request #385: Add docstrings and typehints with Github copilot

46 of 47 new or added lines in 11 files covered. (97.87%)

4 existing lines in 2 files now uncovered.

904 of 958 relevant lines covered (94.36%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/executorlib/shared/interface.py
1
import subprocess
1✔
2
from abc import ABC
1✔
3
from typing import Optional
1✔
4

5
import conda_subprocess
1✔
6

7
MPI_COMMAND = "mpiexec"
1✔
8
SLURM_COMMAND = "srun"
1✔
9

10

11
class BaseInterface(ABC):
1✔
12
    def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False):
1✔
13
        """
14
        Base class for interface implementations.
15

16
        Args:
17
            cwd (str): The current working directory.
18
            cores (int, optional): The number of cores to use. Defaults to 1.
19
            oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
20
        """
21
        self._cwd = cwd
1✔
22
        self._cores = cores
1✔
23
        self._oversubscribe = oversubscribe
1✔
24

25
    def bootup(
1✔
26
        self,
27
        command_lst: list[str],
28
        prefix_name: Optional[str] = None,
29
        prefix_path: Optional[str] = None,
30
    ):
31
        """
32
        Method to start the interface.
33

34
        Args:
35
            command_lst (list[str]): The command list to execute.
36
            prefix_name (str, optional): The prefix name. Defaults to None.
37
            prefix_path (str, optional): The prefix path. Defaults to None.
38
        """
UNCOV
39
        raise NotImplementedError
×
40

41
    def shutdown(self, wait: bool = True):
1✔
42
        """
43
        Method to shutdown the interface.
44

45
        Args:
46
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
47
        """
UNCOV
48
        raise NotImplementedError
×
49

50
    def poll(self):
1✔
51
        """
52
        Method to check if the interface is running.
53

54
        Returns:
55
            bool: True if the interface is running, False otherwise.
56
        """
UNCOV
57
        raise NotImplementedError
×
58

59

60
class SubprocessInterface(BaseInterface):
1✔
61
    def __init__(
1✔
62
        self,
63
        cwd: Optional[str] = None,
64
        cores: int = 1,
65
        oversubscribe: bool = False,
66
    ):
67
        """
68
        Subprocess interface implementation.
69

70
        Args:
71
            cwd (str, optional): The current working directory. Defaults to None.
72
            cores (int, optional): The number of cores to use. Defaults to 1.
73
            oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
74
        """
75
        super().__init__(
1✔
76
            cwd=cwd,
77
            cores=cores,
78
            oversubscribe=oversubscribe,
79
        )
80
        self._process = None
1✔
81

82
    def bootup(
1✔
83
        self,
84
        command_lst: list[str],
85
        prefix_name: Optional[str] = None,
86
        prefix_path: Optional[str] = None,
87
    ):
88
        """
89
        Method to start the subprocess interface.
90

91
        Args:
92
            command_lst (list[str]): The command list to execute.
93
            prefix_name (str, optional): The prefix name. Defaults to None.
94
            prefix_path (str, optional): The prefix path. Defaults to None.
95
        """
96
        if prefix_name is None and prefix_path is None:
1✔
97
            self._process = subprocess.Popen(
1✔
98
                args=self.generate_command(command_lst=command_lst),
99
                cwd=self._cwd,
100
                stdin=subprocess.DEVNULL,
101
            )
102
        else:
103
            self._process = conda_subprocess.Popen(
1✔
104
                args=self.generate_command(command_lst=command_lst),
105
                cwd=self._cwd,
106
                stdin=subprocess.DEVNULL,
107
                prefix_path=prefix_path,
108
                prefix_name=prefix_name,
109
            )
110

111
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
112
        """
113
        Method to generate the command list.
114

115
        Args:
116
            command_lst (list[str]): The command list.
117

118
        Returns:
119
            list[str]: The generated command list.
120
        """
121
        return command_lst
1✔
122

123
    def shutdown(self, wait: bool = True):
1✔
124
        """
125
        Method to shutdown the subprocess interface.
126

127
        Args:
128
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
129
        """
130
        self._process.communicate()
1✔
131
        self._process.terminate()
1✔
132
        if wait:
1✔
133
            self._process.wait()
1✔
134
        self._process = None
1✔
135

136
    def poll(self) -> bool:
1✔
137
        """
138
        Method to check if the subprocess interface is running.
139

140
        Returns:
141
            bool: True if the interface is running, False otherwise.
142
        """
143
        return self._process is not None and self._process.poll() is None
1✔
144

145

146
class MpiExecInterface(SubprocessInterface):
1✔
147
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
148
        """
149
        Generate the command list for the MPIExec interface.
150

151
        Args:
152
            command_lst (list[str]): The command list.
153

154
        Returns:
155
            list[str]: The generated command list.
156
        """
157
        command_prepend_lst = generate_mpiexec_command(
1✔
158
            cores=self._cores,
159
            oversubscribe=self._oversubscribe,
160
        )
161
        return super().generate_command(
1✔
162
            command_lst=command_prepend_lst + command_lst,
163
        )
164

165

166
class SrunInterface(SubprocessInterface):
1✔
167
    def __init__(
1✔
168
        self,
169
        cwd: Optional[str] = None,
170
        cores: int = 1,
171
        threads_per_core: int = 1,
172
        gpus_per_core: int = 0,
173
        oversubscribe: bool = False,
174
        command_line_argument_lst: list[str] = [],
175
    ):
176
        """
177
        Srun interface implementation.
178

179
        Args:
180
            cwd (str, optional): The current working directory. Defaults to None.
181
            cores (int, optional): The number of cores to use. Defaults to 1.
182
            threads_per_core (int, optional): The number of threads per core. Defaults to 1.
183
            gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
184
            oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
185
            command_line_argument_lst (list[str], optional): Additional command line arguments. Defaults to [].
186
        """
187
        super().__init__(
1✔
188
            cwd=cwd,
189
            cores=cores,
190
            oversubscribe=oversubscribe,
191
        )
192
        self._threads_per_core = threads_per_core
1✔
193
        self._gpus_per_core = gpus_per_core
1✔
194
        self._command_line_argument_lst = command_line_argument_lst
1✔
195

196
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
197
        """
198
        Generate the command list for the Srun interface.
199

200
        Args:
201
            command_lst (list[str]): The command list.
202

203
        Returns:
204
            list[str]: The generated command list.
205
        """
206
        command_prepend_lst = generate_slurm_command(
1✔
207
            cores=self._cores,
208
            cwd=self._cwd,
209
            threads_per_core=self._threads_per_core,
210
            gpus_per_core=self._gpus_per_core,
211
            oversubscribe=self._oversubscribe,
212
            command_line_argument_lst=self._command_line_argument_lst,
213
        )
214
        return super().generate_command(
1✔
215
            command_lst=command_prepend_lst + command_lst,
216
        )
217

218

219
def generate_mpiexec_command(cores: int, oversubscribe: bool = False) -> list[str]:
1✔
220
    """
221
    Generate the command list for the MPIExec interface.
222

223
    Args:
224
        cores (int): The number of cores.
225
        oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
226

227
    Returns:
228
        list[str]: The generated command list.
229
    """
230
    if cores == 1:
1✔
231
        return []
1✔
232
    else:
233
        command_prepend_lst = [MPI_COMMAND, "-n", str(cores)]
1✔
234
        if oversubscribe:
1✔
235
            command_prepend_lst += ["--oversubscribe"]
1✔
236
        return command_prepend_lst
1✔
237

238

239
def generate_slurm_command(
1✔
240
    cores: int,
241
    cwd: str,
242
    threads_per_core: int = 1,
243
    gpus_per_core: int = 0,
244
    oversubscribe: bool = False,
245
    command_line_argument_lst: list[str] = [],
246
) -> list[str]:
247
    """
248
    Generate the command list for the SLURM interface.
249

250
    Args:
251
        cores (int): The number of cores.
252
        cwd (str): The current working directory.
253
        threads_per_core (int, optional): The number of threads per core. Defaults to 1.
254
        gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
255
        oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
256
        command_line_argument_lst (list[str], optional): Additional command line arguments. Defaults to [].
257

258
    Returns:
259
        list[str]: The generated command list.
260
    """
261
    command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
1✔
262
    if cwd is not None:
1✔
263
        command_prepend_lst += ["-D", cwd]
1✔
264
    if threads_per_core > 1:
1✔
265
        command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
×
266
    if gpus_per_core > 0:
1✔
267
        command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
1✔
268
    if oversubscribe:
1✔
269
        command_prepend_lst += ["--oversubscribe"]
1✔
270
    if len(command_line_argument_lst) > 0:
1✔
271
        command_prepend_lst += command_line_argument_lst
1✔
272
    return command_prepend_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc