• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

klahnakoski / mo-threads / 13620243696

02 Mar 2025 11:12PM UTC coverage: 66.004% (+0.9%) from 65.115%
13620243696

push

github

klahnakoski
update version number

1262 of 1912 relevant lines covered (66.0%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.4
/mo_threads/processes.py
1
# encoding: utf-8
2
#
3
# This Source Code Form is subject to the terms of the Mozilla Public
4
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
5
# You can obtain one at https://www.mozilla.org/en-US/MPL/2.0/.
6
#
7
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
8
#
9

10

11
import os
1✔
12
import subprocess
1✔
13
from _thread import allocate_lock
1✔
14
from dataclasses import dataclass
1✔
15
from time import time as unix_now
1✔
16

17
from mo_dots import Null
1✔
18
from mo_future import is_windows, utcnow
1✔
19
from mo_logs import logger, strings
1✔
20
from mo_logs.exceptions import Except
1✔
21
from mo_times import Timer, Date
1✔
22

23
from mo_threads.queues import Queue
1✔
24
from mo_threads.signals import Signal
1✔
25
from mo_threads.threads import PLEASE_STOP, Thread, EndOfThread, ALL_LOCK, ALL
1✔
26
from mo_threads.till import Till
1✔
27

28
DEBUG = False
1✔
29

30
next_process_id_locker = allocate_lock()
1✔
31
next_process_id = 0
1✔
32

33

34
@dataclass
1✔
35
class Status:
1✔
36
    last_read: float
1✔
37

38

39
class Process:
1✔
40
    def __init__(
1✔
41
        self,
42
        name,
43
        params,
44
        cwd=None,
45
        env=None,
46
        debug=False,
47
        shell=False,
48
        bufsize=-1,
49
        timeout=2.0,
50
        startup_timeout=10.0,
51
        parent_thread=None,
52
    ):
53
        """
54
        Spawns multiple threads to manage the stdin/stdout/stderr of the child process; communication is done
55
        via proper thread-safe queues of the same name.
56

57
        Since the process is managed and monitored by threads, the main thread is not blocked when the child process
58
        encounters problems
59

60
        :param name: name given to this process
61
        :param params: list of strings for program name and parameters
62
        :param cwd: current working directory
63
        :param env: enviroment variables
64
        :param debug: true to be verbose about stdin/stdout
65
        :param shell: true to run as command line
66
        :param bufsize: if you want to screw stuff up
67
        :param timeout: how long to wait for process stdout/stderr before we consider it dead
68
                        ensure your process emits lines to stay alive
69
        :param startup_timeout: since the process may take a while to start outputting, this is the wait time
70
                        for the first output
71
        """
72
        global next_process_id_locker, next_process_id
73
        with next_process_id_locker:
1✔
74
            self.process_id = next_process_id
1✔
75
            next_process_id += 1
1✔
76

77
        self.debug = debug or DEBUG
1✔
78
        self.name = f"{name} ({self.process_id})"
1✔
79
        self.stopped = Signal(f"stopped signal for {strings.quote(name)}")
1✔
80
        self.please_stop = Signal(f"please stop for {strings.quote(name)}")
1✔
81
        self.second_last_stdin = None
1✔
82
        self.last_stdin = None
1✔
83
        self.stdin = Queue(f"stdin for process {strings.quote(name)}", silent=not self.debug)
1✔
84
        self.stdout = Queue(f"stdout for process {strings.quote(name)}", silent=not self.debug)
1✔
85
        self.stderr = Queue(f"stderr for process {strings.quote(name)}", silent=not self.debug)
1✔
86
        self.timeout = timeout
1✔
87
        self.monitor_period = 0.5
1✔
88

89
        try:
1✔
90
            if cwd == None:
1✔
91
                cwd = os.getcwd()
1✔
92
            else:
93
                cwd = str(cwd)
1✔
94

95
            command = [str(p) for p in params]
1✔
96
            self.debug and logger.info("command: {command}", command=command)
1✔
97
            self.service = service = subprocess.Popen(
1✔
98
                [str(p) for p in params],
99
                stdin=subprocess.PIPE,
100
                stdout=subprocess.PIPE,
101
                stderr=subprocess.PIPE,
102
                bufsize=bufsize,
103
                cwd=cwd,
104
                env={str(k): str(v) for k, v in {**os.environ, **(env or {})}.items()},
105
                shell=shell,
106
            )
107

108
            self.stdout_status = Status(unix_now() + startup_timeout)
1✔
109
            self.stderr_status = Status(unix_now() + startup_timeout)
1✔
110
            self.kill_once = self.kill
1✔
111
            self.children = (
1✔
112
                Thread(
113
                    self.name + " stdin",
114
                    self._writer,
115
                    service.stdin,
116
                    self.stdin,
117
                    please_stop=self.please_stop,
118
                    parent_thread=Null,
119
                ),
120
                Thread(
121
                    self.name + " stdout",
122
                    self._reader,
123
                    "stdout",
124
                    service.stdout,
125
                    self.stdout,
126
                    self.stdout_status,
127
                    please_stop=self.please_stop,
128
                    parent_thread=Null,
129
                    daemon=True,  # MIGHT LOCKUP, ONLY WAY TO KILL IT
130
                ),
131
                Thread(
132
                    self.name + " stderr",
133
                    self._reader,
134
                    "stderr",
135
                    service.stderr,
136
                    self.stderr,
137
                    self.stderr_status,
138
                    please_stop=self.please_stop,
139
                    parent_thread=Null,
140
                    daemon=True,  # MIGHT LOCKUP, ONLY WAY TO KILL IT
141
                ),
142
                Thread(self.name + " monitor", self._monitor, please_stop=self.please_stop, parent_thread=self),
143
            )
144
            for child in self.children:
1✔
145
                child.start()
1✔
146
        except Exception as cause:
147
            logger.error("Can not call  dir={cwd}", cwd=cwd, cause=cause)
148

149
        self.debug and logger.info(
1✔
150
            "{process} START: {command}", process=self.name, command=" ".join(map(strings.quote, params)),
151
        )
152
        if not parent_thread:
1✔
153
            parent_thread = Thread.current()
1✔
154
        self.parent_thread = parent_thread
1✔
155
        parent_thread.add_child(self)
1✔
156

157
    def __enter__(self):
1✔
158
        return self
×
159

160
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
161
        self.join(raise_on_error=True)
×
162

163
    def add_child(self, child):
1✔
164
        pass
1✔
165

166
    def stop(self):
1✔
167
        self.please_stop.go()
1✔
168
        return self
1✔
169

170
    def join(self, till=None, raise_on_error=True):
1✔
171
        on_error = logger.error if raise_on_error else logger.warning
1✔
172
        self.stopped.wait(till=till)  # TRIGGERED BY _monitor THREAD WHEN DONE (self.children is None)
1✔
173
        if self.returncode is None:
1✔
174
            self.kill()
1✔
175
            on_error(
1✔
176
                "{process} TIMEOUT\n{stderr}", process=self.name, stderr=list(self.stderr),
177
            )
178
        if self.returncode != 0:
1✔
179
            on_error(
1✔
180
                "{process} FAIL: returncode={code|quote}\n{stderr}",
181
                process=self.name,
182
                code=self.service.returncode,
183
                stderr=list(self.stderr),
184
            )
185
        return self
1✔
186

187
    def remove_child(self, child):
1✔
188
        pass
×
189

190
    @property
1✔
191
    def pid(self):
1✔
192
        return self.service.pid
1✔
193

194
    @property
1✔
195
    def returncode(self):
1✔
196
        return self.service.returncode
1✔
197

198
    def _monitor(self, please_stop):
1✔
199
        with Timer(self.name, verbose=self.debug):
1✔
200
            while not please_stop:
1✔
201
                now = unix_now()
1✔
202
                last_out = max(self.stdout_status.last_read, self.stderr_status.last_read)
1✔
203
                took = now - last_out
1✔
204
                if took > self.timeout:
1✔
205
                    self.kill_once()
×
206
                    logger.warning(
×
207
                        "{last_sent} for {name} last used {last_used} took over {timeout} seconds to respond",
208
                        last_sent=self.second_last_stdin,
209
                        last_used=Date(last_out).format(),
210
                        timeout=self.timeout,
211
                        name=self.name,
212
                    )
213
                    break
×
214
                try:
1✔
215
                    self.service.wait(timeout=self.monitor_period)
1✔
216
                    if self.service.returncode is not None:
1✔
217
                        break
1✔
218
                    self.debug and logger.info("{name} waiting for response", name=self.name)
×
219
                except Exception:
1✔
220
                    # TIMEOUT, CHECK FOR LIVELINESS
221
                    pass
1✔
222

223
        (stdin_thread, stdout_thread, stderr_thread, _), self.children = self.children, ()
1✔
224

225
        # stdout can lock up in windows, so do not wait too long
226
        wait_limit = Till(seconds=1)
1✔
227
        try:
1✔
228
            stdout_thread.join(till=wait_limit)
1✔
229
        except:
1✔
230
            # THREAD LOST ON PIPE.readline()
231
            self.stdout.close()
1✔
232
            stdout_thread.release()
1✔
233
            stdout_thread.end_of_thread = EndOfThread(None, None)
1✔
234
            with ALL_LOCK:
1✔
235
                if stdout_thread.ident in ALL:
1✔
236
                    del ALL[stdout_thread.ident]
1✔
237
            stdout_thread.stopped.go()
1✔
238

239
        try:
1✔
240
            stderr_thread.join(till=wait_limit)
1✔
241
        except:
1✔
242
            # THREAD LOST ON PIPE.readline()
243
            self.stderr.close()
1✔
244
            stderr_thread.release()
1✔
245
            stderr_thread.end_of_thread = EndOfThread(None, None)
1✔
246
            with ALL_LOCK:
1✔
247
                if stderr_thread.ident in ALL:
1✔
248
                    del ALL[stderr_thread.ident]
1✔
249
            stderr_thread.stopped.go()
1✔
250

251
        self.stdin.close()
1✔
252
        stdin_thread.join()
1✔
253

254
        self.stopped.go()
1✔
255
        self.debug and logger.info(
1✔
256
            "{process} STOP: returncode={returncode}", process=self.name, returncode=self.service.returncode,
257
        )
258

259
    def _reader(self, name, pipe, receive, status: Status, please_stop):
1✔
260
        """
261
        MOVE LINES FROM pipe TO receive QUEUE
262
        """
263
        self.debug and logger.info("is reading")
1✔
264
        try:
1✔
265
            while not please_stop and self.service.returncode is None:
1✔
266
                data = pipe.readline()  # THIS MAY NEVER RETURN
1✔
267
                status.last_read = unix_now()
1✔
268
                line = data.decode("utf8").rstrip()
1✔
269
                self.debug and logger.info("got line: {line}", line=line)
1✔
270
                if not data:
1✔
271
                    break
1✔
272
                receive.add(line)
1✔
273
        except Exception as cause:
274
            if not please_stop:
275
                logger.warning("premature read failure", cause=cause)
276
        finally:
277
            self.debug and logger.info("{name} closed", name=name)
1✔
278
            self.please_stop.go()
1✔
279
            receive.close()
1✔
280
            pipe.close()
1✔
281

282
    def _writer(self, pipe, send, please_stop):
1✔
283
        while not please_stop:
1✔
284
            line = send.pop(till=please_stop)
1✔
285
            if line is PLEASE_STOP:
1✔
286
                please_stop.go()
×
287
                self.debug and logger.info("got PLEASE_STOP")
×
288
                break
×
289
            elif line is None:
1✔
290
                continue
1✔
291
            self.second_last_stdin = self.last_stdin
1✔
292
            self.last_stdin = line
1✔
293
            self.debug and logger.info(
1✔
294
                "send line: {line}", process=self.name, line=line.rstrip(),
295
            )
296
            try:
1✔
297
                pipe.write(line.encode("utf8"))
1✔
298
                pipe.write(EOL)
1✔
299
                pipe.flush()
1✔
300
            except Exception as cause:
301
                # HAPPENS WHEN PROCESS IS DONE
302
                self.debug and logger.info("pipe closed")
303
                break
304
        self.debug and logger.info("writer closed")
1✔
305

306
    def kill(self):
1✔
307
        self.kill_once = Null
1✔
308
        try:
1✔
309
            if self.service.returncode is not None:
1✔
310
                return
×
311
            self.service.kill()
1✔
312
            logger.info("{process} was successfully terminated.", process=self.name, stack_depth=1)
1✔
313
        except Exception as cause:
314
            cause = Except.wrap(cause)
315
            if "The operation completed successfully" in cause:
316
                return
317
            if "No such process" in cause:
318
                return
319

320
            logger.warning(
321
                "Failure to kill process {process|quote}", process=self.name, cause=cause,
322
            )
323

324

325
if is_windows:
1✔
326
    EOL = b"\r\n"
×
327
else:
328
    EOL = b"\n"
1✔
329

330

331
def os_path(path):
1✔
332
    """
333
    :return: OS-specific path
334
    """
335
    if path == None:
1✔
336
        return None
1✔
337
    if os.sep == "/":
1✔
338
        return path
1✔
339
    return str(path).lstrip("/")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc