• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12426327282

20 Dec 2024 05:52AM UTC coverage: 95.849% (+0.1%) from 95.704%
12426327282

Pull #519

github

web-flow
Merge 482cb8fef into e9a81f822
Pull Request #519: Add option to write flux log files

5 of 6 new or added lines in 4 files covered. (83.33%)

15 existing lines in 4 files now uncovered.

993 of 1036 relevant lines covered (95.85%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.02
/executorlib/standalone/interactive/spawner.py
1
import subprocess
1✔
2
from abc import ABC
1✔
3
from typing import Optional
1✔
4

5
MPI_COMMAND = "mpiexec"
1✔
6

7

8
class BaseSpawner(ABC):
1✔
9
    def __init__(self, cwd: str, cores: int = 1, openmpi_oversubscribe: bool = False):
1✔
10
        """
11
        Base class for interface implementations.
12

13
        Args:
14
            cwd (str): The current working directory.
15
            cores (int, optional): The number of cores to use. Defaults to 1.
16
            openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
17
        """
18
        self._cwd = cwd
1✔
19
        self._cores = cores
1✔
20
        self._openmpi_oversubscribe = openmpi_oversubscribe
1✔
21

22
    def bootup(
1✔
23
        self,
24
        command_lst: list[str],
25
    ):
26
        """
27
        Method to start the interface.
28

29
        Args:
30
            command_lst (list[str]): The command list to execute.
31
        """
UNCOV
32
        raise NotImplementedError
×
33

34
    def shutdown(self, wait: bool = True):
1✔
35
        """
36
        Method to shutdown the interface.
37

38
        Args:
39
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
40
        """
UNCOV
41
        raise NotImplementedError
×
42

43
    def poll(self):
1✔
44
        """
45
        Method to check if the interface is running.
46

47
        Returns:
48
            bool: True if the interface is running, False otherwise.
49
        """
UNCOV
50
        raise NotImplementedError
×
51

52

53
class SubprocessSpawner(BaseSpawner):
1✔
54
    def __init__(
1✔
55
        self,
56
        cwd: Optional[str] = None,
57
        cores: int = 1,
58
        openmpi_oversubscribe: bool = False,
59
        threads_per_core: int = 1,
60
    ):
61
        """
62
        Subprocess interface implementation.
63

64
        Args:
65
            cwd (str, optional): The current working directory. Defaults to None.
66
            cores (int, optional): The number of cores to use. Defaults to 1.
67
            threads_per_core (int, optional): The number of threads per core. Defaults to 1.
68
            oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
69
        """
70
        super().__init__(
1✔
71
            cwd=cwd,
72
            cores=cores,
73
            openmpi_oversubscribe=openmpi_oversubscribe,
74
        )
75
        self._process = None
1✔
76
        self._threads_per_core = threads_per_core
1✔
77

78
    def bootup(
1✔
79
        self,
80
        command_lst: list[str],
81
    ):
82
        """
83
        Method to start the subprocess interface.
84

85
        Args:
86
            command_lst (list[str]): The command list to execute.
87
        """
88
        self._process = subprocess.Popen(
1✔
89
            args=self.generate_command(command_lst=command_lst),
90
            cwd=self._cwd,
91
            stdin=subprocess.DEVNULL,
92
        )
93

94
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
95
        """
96
        Method to generate the command list.
97

98
        Args:
99
            command_lst (list[str]): The command list.
100

101
        Returns:
102
            list[str]: The generated command list.
103
        """
104
        return command_lst
1✔
105

106
    def shutdown(self, wait: bool = True):
1✔
107
        """
108
        Method to shutdown the subprocess interface.
109

110
        Args:
111
            wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
112
        """
113
        self._process.communicate()
1✔
114
        self._process.terminate()
1✔
115
        if wait:
1✔
116
            self._process.wait()
1✔
117
        self._process = None
1✔
118

119
    def poll(self) -> bool:
1✔
120
        """
121
        Method to check if the subprocess interface is running.
122

123
        Returns:
124
            bool: True if the interface is running, False otherwise.
125
        """
126
        return self._process is not None and self._process.poll() is None
1✔
127

128

129
class MpiExecSpawner(SubprocessSpawner):
1✔
130
    def generate_command(self, command_lst: list[str]) -> list[str]:
1✔
131
        """
132
        Generate the command list for the MPIExec interface.
133

134
        Args:
135
            command_lst (list[str]): The command list.
136

137
        Returns:
138
            list[str]: The generated command list.
139
        """
140
        command_prepend_lst = generate_mpiexec_command(
1✔
141
            cores=self._cores,
142
            openmpi_oversubscribe=self._openmpi_oversubscribe,
143
        )
144
        return super().generate_command(
1✔
145
            command_lst=command_prepend_lst + command_lst,
146
        )
147

148

149
def generate_mpiexec_command(
1✔
150
    cores: int, openmpi_oversubscribe: bool = False
151
) -> list[str]:
152
    """
153
    Generate the command list for the MPIExec interface.
154

155
    Args:
156
        cores (int): The number of cores.
157
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
158

159
    Returns:
160
        list[str]: The generated command list.
161
    """
162
    if cores == 1:
1✔
163
        return []
1✔
164
    else:
165
        command_prepend_lst = [MPI_COMMAND, "-n", str(cores)]
1✔
166
        if openmpi_oversubscribe:
1✔
167
            command_prepend_lst += ["--oversubscribe"]
1✔
168
        return command_prepend_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc