• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

klahnakoski / mo-threads / 13886447739

16 Mar 2025 07:00PM UTC coverage: 66.074% (-0.2%) from 66.283%
13886447739

push

github

klahnakoski
update lockfile

1264 of 1913 relevant lines covered (66.07%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.44
/mo_threads/processes.py
1
# encoding: utf-8
2
#
3
# This Source Code Form is subject to the terms of the Mozilla Public
4
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
5
# You can obtain one at https://www.mozilla.org/en-US/MPL/2.0/.
6
#
7
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
8
#
9

10

11
import os
1✔
12
import subprocess
1✔
13
from _thread import allocate_lock
1✔
14
from dataclasses import dataclass
1✔
15
from time import time as unix_now
1✔
16

17
from mo_dots import Null
1✔
18
from mo_future import is_windows
1✔
19
from mo_logs import logger, strings
1✔
20
from mo_logs.exceptions import Except
1✔
21
from mo_times import Timer, Date
1✔
22

23
from mo_threads.queues import Queue
1✔
24
from mo_threads.signals import Signal
1✔
25
from mo_threads.threads import PLEASE_STOP, Thread, EndOfThread, ALL_LOCK, ALL
1✔
26
from mo_threads.till import Till
1✔
27

28
DEBUG = False
1✔
29

30
next_process_id_locker = allocate_lock()
1✔
31
next_process_id = 0
1✔
32

33

34
@dataclass
1✔
35
class Status:
1✔
36
    last_read: float
1✔
37

38

39
class Process:
1✔
40
    def __init__(
1✔
41
        self,
42
        name,
43
        params,
44
        cwd=None,
45
        env=None,
46
        debug=False,
47
        shell=False,
48
        bufsize=-1,
49
        timeout=2.0,
50
        startup_timeout=10.0,
51
        parent_thread=None,
52
    ):
53
        """
54
        Spawns multiple threads to manage the stdin/stdout/stderr of the child process; communication is done
55
        via proper thread-safe queues of the same name.
56

57
        Since the process is managed and monitored by threads, the main thread is not blocked when the child process
58
        encounters problems
59

60
        :param name: name given to this process
61
        :param params: list of strings for program name and parameters
62
        :param cwd: current working directory
63
        :param env: enviroment variables
64
        :param debug: true to be verbose about stdin/stdout
65
        :param shell: true to run as command line
66
        :param bufsize: if you want to screw stuff up
67
        :param timeout: how long to wait for process stdout/stderr before we consider it dead
68
                        ensure your process emits lines to stay alive
69
        :param startup_timeout: since the process may take a while to start outputting, this is the wait time
70
                        for the first output
71
        """
72
        global next_process_id_locker, next_process_id
73
        with next_process_id_locker:
1✔
74
            self.process_id = next_process_id
1✔
75
            next_process_id += 1
1✔
76

77
        self.debug = debug or DEBUG
1✔
78
        self.name = f"{name} ({self.process_id})"
1✔
79
        self.stopped = Signal(f"stopped signal for {strings.quote(name)}")
1✔
80
        self.please_stop = Signal(f"please stop for {strings.quote(name)}")
1✔
81
        self.second_last_stdin = None
1✔
82
        self.last_stdin = None
1✔
83
        self.stdin = Queue(f"stdin for process {strings.quote(name)}", silent=not self.debug)
1✔
84
        self.stdout = Queue(f"stdout for process {strings.quote(name)}", silent=not self.debug)
1✔
85
        self.stderr = Queue(f"stderr for process {strings.quote(name)}", silent=not self.debug)
1✔
86
        self.timeout = timeout
1✔
87
        self.monitor_period = 0.5
1✔
88

89
        try:
1✔
90
            if cwd == None:
1✔
91
                cwd = os.getcwd()
1✔
92
            else:
93
                cwd = str(cwd)
1✔
94

95
            command = [str(p) for p in params]
1✔
96
            self.debug and logger.info("command: {command}", command=command)
1✔
97
            self.service = service = subprocess.Popen(
1✔
98
                [str(p) for p in params],
99
                stdin=subprocess.PIPE,
100
                stdout=subprocess.PIPE,
101
                stderr=subprocess.PIPE,
102
                bufsize=bufsize,
103
                cwd=cwd,
104
                env={str(k): str(v) for k, v in {**os.environ, **(env or {})}.items()},
105
                shell=shell,
106
            )
107

108
            self.stdout_status = Status(unix_now() + startup_timeout)
1✔
109
            self.stderr_status = Status(unix_now() + startup_timeout)
1✔
110
            self.kill_once = self.kill
1✔
111
            self.children = (
1✔
112
                Thread(
113
                    self.name + " stdin",
114
                    self._writer,
115
                    service.stdin,
116
                    self.stdin,
117
                    please_stop=self.please_stop,
118
                    parent_thread=Null,
119
                ),
120
                Thread(
121
                    self.name + " stdout",
122
                    self._reader,
123
                    "stdout",
124
                    service.stdout,
125
                    self.stdout,
126
                    self.stdout_status,
127
                    please_stop=self.please_stop,
128
                    parent_thread=Null,
129
                    daemon=True,  # MIGHT LOCKUP, ONLY WAY TO KILL IT
130
                ),
131
                Thread(
132
                    self.name + " stderr",
133
                    self._reader,
134
                    "stderr",
135
                    service.stderr,
136
                    self.stderr,
137
                    self.stderr_status,
138
                    please_stop=self.please_stop,
139
                    parent_thread=Null,
140
                    daemon=True,  # MIGHT LOCKUP, ONLY WAY TO KILL IT
141
                ),
142
                Thread(self.name + " monitor", self._monitor, please_stop=self.please_stop, parent_thread=self),
143
            )
144
            for child in self.children:
1✔
145
                child.start()
1✔
146
        except Exception as cause:
147
            logger.error("Can not call  dir={cwd}", cwd=cwd, cause=cause)
148

149
        self.debug and logger.info(
1✔
150
            "{process} START: {command}", process=self.name, command=" ".join(map(strings.quote, params)),
151
        )
152
        if not parent_thread:
1✔
153
            parent_thread = Thread.current()
1✔
154
        self.parent_thread = parent_thread
1✔
155
        parent_thread.add_child(self)
1✔
156

157
    def __enter__(self):
1✔
158
        return self
×
159

160
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
161
        self.join(raise_on_error=True)
×
162

163
    def add_child(self, child):
1✔
164
        pass
1✔
165

166
    def stop(self):
1✔
167
        self.please_stop.go()
1✔
168
        return self
1✔
169

170
    def join(self, till=None, raise_on_error=True):
1✔
171
        on_error = logger.error if raise_on_error else logger.warning
1✔
172
        self.stopped.wait(till=till)  # TRIGGERED BY _monitor THREAD WHEN DONE (self.children is None)
1✔
173
        self.parent_thread.remove_child(self)
1✔
174
        if self.returncode is None:
1✔
175
            self.kill()
1✔
176
            on_error(
1✔
177
                "{process} TIMEOUT\n{stderr}", process=self.name, stderr=list(self.stderr),
178
            )
179
        if self.returncode != 0:
1✔
180
            on_error(
1✔
181
                "{process} FAIL: returncode={code|quote}\n{stderr}",
182
                process=self.name,
183
                code=self.service.returncode,
184
                stderr=list(self.stderr),
185
            )
186
        return self
1✔
187

188
    def remove_child(self, child):
1✔
189
        pass
×
190

191
    @property
1✔
192
    def pid(self):
1✔
193
        return self.service.pid
1✔
194

195
    @property
1✔
196
    def returncode(self):
1✔
197
        return self.service.returncode
1✔
198

199
    def _monitor(self, please_stop):
1✔
200
        with Timer(self.name, verbose=self.debug):
1✔
201
            while not please_stop:
1✔
202
                now = unix_now()
1✔
203
                last_out = max(self.stdout_status.last_read, self.stderr_status.last_read)
1✔
204
                took = now - last_out
1✔
205
                if took > self.timeout:
1✔
206
                    self.kill_once()
×
207
                    logger.warning(
×
208
                        "{last_sent} for {name} last used {last_used} took over {timeout} seconds to respond",
209
                        last_sent=self.second_last_stdin,
210
                        last_used=Date(last_out).format(),
211
                        timeout=self.timeout,
212
                        name=self.name,
213
                    )
214
                    break
×
215
                try:
1✔
216
                    self.service.wait(timeout=self.monitor_period)
1✔
217
                    if self.service.returncode is not None:
1✔
218
                        break
1✔
219
                    self.debug and logger.info("{name} waiting for response", name=self.name)
×
220
                except Exception:
1✔
221
                    # TIMEOUT, CHECK FOR LIVELINESS
222
                    pass
1✔
223

224
        (stdin_thread, stdout_thread, stderr_thread, _), self.children = self.children, ()
1✔
225

226
        # stdout can lock up in windows, so do not wait too long
227
        wait_limit = Till(seconds=1)
1✔
228
        try:
1✔
229
            stdout_thread.join(till=wait_limit)
1✔
230
        except:
1✔
231
            # THREAD LOST ON PIPE.readline()
232
            self.stdout.close()
1✔
233
            stdout_thread.release()
1✔
234
            stdout_thread.end_of_thread = EndOfThread(None, None)
1✔
235
            with ALL_LOCK:
1✔
236
                if stdout_thread.ident in ALL:
1✔
237
                    del ALL[stdout_thread.ident]
1✔
238
            stdout_thread.stopped.go()
1✔
239

240
        try:
1✔
241
            stderr_thread.join(till=wait_limit)
1✔
242
        except:
1✔
243
            # THREAD LOST ON PIPE.readline()
244
            self.stderr.close()
1✔
245
            stderr_thread.release()
1✔
246
            stderr_thread.end_of_thread = EndOfThread(None, None)
1✔
247
            with ALL_LOCK:
1✔
248
                if stderr_thread.ident in ALL:
1✔
249
                    del ALL[stderr_thread.ident]
1✔
250
            stderr_thread.stopped.go()
1✔
251

252
        self.stdin.close()
1✔
253
        stdin_thread.join()
1✔
254

255
        self.stopped.go()
1✔
256
        self.debug and logger.info(
1✔
257
            "{process} STOP: returncode={returncode}", process=self.name, returncode=self.service.returncode,
258
        )
259

260
    def _reader(self, name, pipe, receive, status: Status, please_stop):
1✔
261
        """
262
        MOVE LINES FROM pipe TO receive QUEUE
263
        """
264
        self.debug and logger.info("is reading")
1✔
265
        try:
1✔
266
            while not please_stop and self.service.returncode is None:
1✔
267
                data = pipe.readline()  # THIS MAY NEVER RETURN
1✔
268
                status.last_read = unix_now()
1✔
269
                line = data.decode("utf8").rstrip()
1✔
270
                self.debug and logger.info("got line: {line}", line=line)
1✔
271
                if not data:
1✔
272
                    break
1✔
273
                receive.add(line)
1✔
274
        except Exception as cause:
275
            if not please_stop:
276
                logger.warning("premature read failure", cause=cause)
277
        finally:
278
            self.debug and logger.info("{name} closed", name=name)
1✔
279
            self.please_stop.go()
1✔
280
            receive.close()
1✔
281
            pipe.close()
1✔
282

283
    def _writer(self, pipe, send, please_stop):
1✔
284
        while not please_stop:
1✔
285
            line = send.pop(till=please_stop)
1✔
286
            if line is PLEASE_STOP:
1✔
287
                please_stop.go()
×
288
                self.debug and logger.info("got PLEASE_STOP")
×
289
                break
×
290
            elif line is None:
1✔
291
                continue
1✔
292
            self.second_last_stdin = self.last_stdin
1✔
293
            self.last_stdin = line
1✔
294
            self.debug and logger.info(
1✔
295
                "send line: {line}", process=self.name, line=line.rstrip(),
296
            )
297
            try:
1✔
298
                pipe.write(line.encode("utf8"))
1✔
299
                pipe.write(EOL)
1✔
300
                pipe.flush()
1✔
301
            except Exception as cause:
302
                # HAPPENS WHEN PROCESS IS DONE
303
                self.debug and logger.info("pipe closed")
304
                break
305
        self.debug and logger.info("writer closed")
1✔
306

307
    def kill(self):
1✔
308
        self.kill_once = Null
1✔
309
        try:
1✔
310
            if self.service.returncode is not None:
1✔
311
                return
×
312
            self.service.kill()
1✔
313
            logger.info("{process} was successfully terminated.", process=self.name, stack_depth=1)
1✔
314
        except Exception as cause:
315
            cause = Except.wrap(cause)
316
            if "The operation completed successfully" in cause:
317
                return
318
            if "No such process" in cause:
319
                return
320

321
            logger.warning(
322
                "Failure to kill process {process|quote}", process=self.name, cause=cause,
323
            )
324

325

326
if is_windows:
1✔
327
    EOL = b"\r\n"
×
328
else:
329
    EOL = b"\n"
1✔
330

331

332
def os_path(path):
1✔
333
    """
334
    :return: OS-specific path
335
    """
336
    if path == None:
1✔
337
        return None
1✔
338
    if os.sep == "/":
1✔
339
        return path
1✔
340
    return str(path).lstrip("/")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc