• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12483233294

24 Dec 2024 02:59PM UTC coverage: 96.004% (-0.4%) from 96.367%
12483233294

Pull #535

github

web-flow
Merge 1799b4891 into 5cf3ecc7e
Pull Request #535: Add type checking with mypy

120 of 127 new or added lines in 15 files covered. (94.49%)

3 existing lines in 2 files now uncovered.

1033 of 1076 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.06
/executorlib/standalone/interactive/communication.py
1
import sys
1✔
2
from socket import gethostname
1✔
3
from typing import Optional, Tuple
1✔
4

5
import cloudpickle
1✔
6
import zmq
1✔
7

8

9
class SocketInterface:
1✔
10
    """
11
    The SocketInterface is an abstraction layer on top of the zero message queue.
12

13
    Args:
14
        spawner (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
15
    """
16

17
    def __init__(self, spawner=None):
1✔
18
        """
19
        Initialize the SocketInterface.
20

21
        Args:
22
            spawner (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
23
        """
24
        self._context = zmq.Context()
1✔
25
        self._socket = self._context.socket(zmq.PAIR)
1✔
26
        self._process = None
1✔
27
        self._spawner = spawner
1✔
28

29
    def send_dict(self, input_dict: dict):
1✔
30
        """
31
        Send a dictionary with instructions to a connected client process.
32

33
        Args:
34
            input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the
35
                connected client from listening.
36
        """
37
        self._socket.send(cloudpickle.dumps(input_dict))
1✔
38

39
    def receive_dict(self) -> dict:
1✔
40
        """
41
        Receive a dictionary from a connected client process.
42

43
        Returns:
44
            dict: dictionary with response received from the connected client
45
        """
46
        output = cloudpickle.loads(self._socket.recv())
1✔
47
        if "result" in output.keys():
1✔
48
            return output["result"]
1✔
49
        else:
50
            error_type = output["error_type"].split("'")[1]
1✔
51
            raise eval(error_type)(output["error"])
1✔
52

53
    def send_and_receive_dict(self, input_dict: dict) -> dict:
1✔
54
        """
55
        Combine both the send_dict() and receive_dict() function in a single call.
56

57
        Args:
58
            input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the
59
                connected client from listening.
60

61
        Returns:
62
            dict: dictionary with response received from the connected client
63
        """
64
        self.send_dict(input_dict=input_dict)
1✔
65
        return self.receive_dict()
1✔
66

67
    def bind_to_random_port(self) -> int:
1✔
68
        """
69
        Identify a random port typically in the range from 49152 to 65536 to bind the SocketInterface instance to. Other
70
        processes can then connect to this port to receive instructions and send results.
71

72
        Returns:
73
            int: port the SocketInterface instance is bound to.
74
        """
75
        return self._socket.bind_to_random_port("tcp://*")
1✔
76

77
    def bootup(
1✔
78
        self,
79
        command_lst: list[str],
80
    ):
81
        """
82
        Boot up the client process to connect to the SocketInterface.
83

84
        Args:
85
            command_lst (list): list of strings to start the client process
86
        """
87
        self._spawner.bootup(
1✔
88
            command_lst=command_lst,
89
        )
90

91
    def shutdown(self, wait: bool = True):
1✔
92
        """
93
        Shutdown the SocketInterface and the connected client process.
94

95
        Args:
96
            wait (bool): Whether to wait for the client process to finish before returning. Default is True.
97
        """
98
        result = None
1✔
99
        if self._spawner.poll():
1✔
100
            result = self.send_and_receive_dict(
1✔
101
                input_dict={"shutdown": True, "wait": wait}
102
            )
103
            self._spawner.shutdown(wait=wait)
1✔
104
        if self._socket is not None:
1✔
105
            self._socket.close()
1✔
106
        if self._context is not None:
1✔
107
            self._context.term()
1✔
108
        self._process = None
1✔
109
        self._socket = None
1✔
110
        self._context = None
1✔
111
        return result
1✔
112

113
    def __del__(self):
1✔
114
        """
115
        Destructor for the SocketInterface class.
116
        Calls the shutdown method with wait=True to ensure proper cleanup.
117
        """
118
        self.shutdown(wait=True)
1✔
119

120

121
def interface_bootup(
1✔
122
    command_lst: list[str],
123
    connections,
124
    hostname_localhost: Optional[bool] = None,
125
) -> SocketInterface:
126
    """
127
    Start interface for ZMQ communication
128

129
    Args:
130
        command_lst (list): List of commands as strings
131
        connections (executorlib.shared.spawner.BaseSpawner): Interface to start parallel process, like MPI, SLURM
132
                                                                  or Flux
133
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
134
                                      context of an HPC cluster this essential to be able to communicate to an
135
                                      Executor running on a different compute node within the same allocation. And
136
                                      in principle any computer should be able to resolve that their own hostname
137
                                      points to the same address as localhost. Still MacOS >= 12 seems to disable
138
                                      this look up for security reasons. So on MacOS it is required to set this
139
                                      option to true
140

141
    Returns:
142
         executorlib.shared.communication.SocketInterface: socket interface for zmq communication
143
    """
144
    if hostname_localhost is None and sys.platform == "darwin":
1✔
145
        hostname_localhost = True
×
146
    elif hostname_localhost is None:
1✔
147
        hostname_localhost = False
1✔
148
    if not hostname_localhost:
1✔
149
        command_lst += [
1✔
150
            "--host",
151
            gethostname(),
152
        ]
153
    interface = SocketInterface(spawner=connections)
1✔
154
    command_lst += [
1✔
155
        "--zmqport",
156
        str(interface.bind_to_random_port()),
157
    ]
158
    interface.bootup(
1✔
159
        command_lst=command_lst,
160
    )
161
    return interface
1✔
162

163

164
def interface_connect(host: str, port: str) -> Tuple[zmq.Context, zmq.Socket]:
1✔
165
    """
166
    Connect to an existing SocketInterface instance by providing the hostname and the port as strings.
167

168
    Args:
169
        host (str): hostname of the host running the SocketInterface instance to connect to.
170
        port (str): port on the host the SocketInterface instance is running on.
171
    """
172
    context = zmq.Context()
1✔
173
    socket = context.socket(zmq.PAIR)
1✔
174
    socket.connect("tcp://" + host + ":" + port)
1✔
175
    return context, socket
1✔
176

177

178
def interface_send(socket: Optional[zmq.Socket], result_dict: dict):
1✔
179
    """
180
    Send results to a SocketInterface instance.
181

182
    Args:
183
        socket (zmq.Socket): socket for the connection
184
        result_dict (dict): dictionary to be sent, supported keys are result, error and error_type.
185
    """
186
    if socket is not None:
1✔
187
        socket.send(cloudpickle.dumps(result_dict))
1✔
188

189

190
def interface_receive(socket: Optional[zmq.Socket]) -> dict:
1✔
191
    """
192
    Receive instructions from a SocketInterface instance.
193

194
    Args:
195
        socket (zmq.Socket): socket for the connection
196
    """
197
    if socket is not None:
1✔
198
        return cloudpickle.loads(socket.recv())
1✔
199
    else:
NEW
200
        return {}
×
201

202

203
def interface_shutdown(socket: Optional[zmq.Socket], context: Optional[zmq.Context]):
1✔
204
    """
205
    Close the connection to a SocketInterface instance.
206

207
    Args:
208
        socket (zmq.Socket): socket for the connection
209
        context (zmq.sugar.context.Context): context for the connection
210
    """
211
    if socket is not None and context is not None:
1✔
212
        socket.close()
1✔
213
        context.term()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc