• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

klahnakoski / mo-threads / 13148523232

05 Feb 2025 01:48AM UTC coverage: 66.056% (-0.2%) from 66.266%
13148523232

push

github

klahnakoski
update lockfile

1263 of 1912 relevant lines covered (66.06%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.4
/mo_threads/processes.py
1
# encoding: utf-8
2
#
3
# This Source Code Form is subject to the terms of the Mozilla Public
4
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
5
# You can obtain one at https://www.mozilla.org/en-US/MPL/2.0/.
6
#
7
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
8
#
9

10

11
import os
1✔
12
import subprocess
1✔
13
from _thread import allocate_lock
1✔
14
from dataclasses import dataclass
1✔
15
from time import time as unix_now
1✔
16

17
from mo_dots import Null
1✔
18
from mo_future import is_windows, utcnow
1✔
19
from mo_logs import logger, strings
1✔
20
from mo_logs.exceptions import Except
1✔
21
from mo_times import Timer, Date
1✔
22

23
from mo_threads.queues import Queue
1✔
24
from mo_threads.signals import Signal
1✔
25
from mo_threads.threads import PLEASE_STOP, Thread, EndOfThread, ALL_LOCK, ALL
1✔
26
from mo_threads.till import Till
1✔
27

28
DEBUG = False
1✔
29

30
next_process_id_locker = allocate_lock()
1✔
31
next_process_id = 0
1✔
32

33

34
@dataclass
1✔
35
class Status:
1✔
36
    last_read: float
1✔
37

38

39
class Process(object):
1✔
40
    def __init__(
1✔
41
        self,
42
        name,
43
        params,
44
        cwd=None,
45
        env=None,
46
        debug=False,
47
        shell=False,
48
        bufsize=-1,
49
        timeout=2.0,
50
        startup_timeout=10.0,
51
        parent_thread=None,
52
    ):
53
        """
54
        Spawns multiple threads to manage the stdin/stdout/stderr of the child process; communication is done
55
        via proper thread-safe queues of the same name.
56

57
        Since the process is managed and monitored by threads, the main thread is not blocked when the child process
58
        encounters problems
59

60
        :param name: name given to this process
61
        :param params: list of strings for program name and parameters
62
        :param cwd: current working directory
63
        :param env: enviroment variables
64
        :param debug: true to be verbose about stdin/stdout
65
        :param shell: true to run as command line
66
        :param bufsize: if you want to screw stuff up
67
        :param timeout: how long to wait for process stdout/stderr before we consider it dead
68
                        ensure your process emits lines to stay alive
69
        :param startup_timeout: since the process may take a while to start outputting, this is the wait time
70
                        for the first output
71
        """
72
        global next_process_id_locker, next_process_id
73
        with next_process_id_locker:
1✔
74
            self.process_id = next_process_id
1✔
75
            next_process_id += 1
1✔
76

77
        self.debug = debug or DEBUG
1✔
78
        self.name = f"{name} ({self.process_id})"
1✔
79
        self.stopped = Signal(f"stopped signal for {strings.quote(name)}")
1✔
80
        self.please_stop = Signal(f"please stop for {strings.quote(name)}")
1✔
81
        self.second_last_stdin = None
1✔
82
        self.last_stdin = None
1✔
83
        self.stdin = Queue(f"stdin for process {strings.quote(name)}", silent=not self.debug)
1✔
84
        self.stdout = Queue(f"stdout for process {strings.quote(name)}", silent=not self.debug)
1✔
85
        self.stderr = Queue(f"stderr for process {strings.quote(name)}", silent=not self.debug)
1✔
86
        self.timeout = timeout
1✔
87
        self.monitor_period = 0.5
1✔
88

89
        try:
1✔
90
            if cwd == None:
1✔
91
                cwd = os.getcwd()
1✔
92
            else:
93
                cwd = str(cwd)
1✔
94

95
            command = [str(p) for p in params]
1✔
96
            self.debug and logger.info("command: {command}", command=command)
1✔
97
            self.service = service = subprocess.Popen(
1✔
98
                [str(p) for p in params],
99
                stdin=subprocess.PIPE,
100
                stdout=subprocess.PIPE,
101
                stderr=subprocess.PIPE,
102
                bufsize=bufsize,
103
                cwd=cwd,
104
                env={str(k): str(v) for k, v in {**os.environ, **(env or {})}.items()},
105
                shell=shell,
106
            )
107

108
            self.stdout_status = Status(unix_now() + startup_timeout)
1✔
109
            self.stderr_status = Status(unix_now() + startup_timeout)
1✔
110
            self.kill_once = self.kill
1✔
111
            self.children = (
1✔
112
                Thread(
113
                    self.name + " stdin",
114
                    self._writer,
115
                    service.stdin,
116
                    self.stdin,
117
                    please_stop=self.please_stop,
118
                    parent_thread=Null,
119
                ),
120
                Thread(
121
                    self.name + " stdout",
122
                    self._reader,
123
                    "stdout",
124
                    service.stdout,
125
                    self.stdout,
126
                    self.stdout_status,
127
                    please_stop=self.please_stop,
128
                    parent_thread=Null,
129
                    daemon=True,  # MIGHT LOCKUP, ONLY WAY TO KILL IT
130
                ),
131
                Thread(
132
                    self.name + " stderr",
133
                    self._reader,
134
                    "stderr",
135
                    service.stderr,
136
                    self.stderr,
137
                    self.stderr_status,
138
                    please_stop=self.please_stop,
139
                    parent_thread=Null,
140
                    daemon=True,  # MIGHT LOCKUP, ONLY WAY TO KILL IT
141
                ),
142
                Thread(self.name + " monitor", self._monitor, please_stop=self.please_stop, parent_thread=self),
143
            )
144
            for child in self.children:
1✔
145
                child.start()
1✔
146
        except Exception as cause:
147
            logger.error("Can not call  dir={cwd}", cwd=cwd, cause=cause)
148

149
        self.debug and logger.info(
1✔
150
            "{process} START: {command}", process=self.name, command=" ".join(map(strings.quote, params)),
151
        )
152
        if not parent_thread:
1✔
153
            parent_thread = Thread.current()
1✔
154
        self.parent_thread = parent_thread
1✔
155
        parent_thread.add_child(self)
1✔
156

157
    def __enter__(self):
1✔
158
        return self
×
159

160
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
161
        self.join(raise_on_error=True)
×
162

163
    def add_child(self, child):
1✔
164
        pass
1✔
165

166
    def stop(self):
1✔
167
        self.please_stop.go()
1✔
168
        return self
1✔
169

170
    def join(self, till=None, raise_on_error=True):
1✔
171
        on_error = logger.error if raise_on_error else logger.warning
1✔
172
        self.stopped.wait(till=till)  # TRIGGERED BY _monitor THREAD WHEN DONE (self.children is None)
1✔
173
        if self.returncode is None:
1✔
174
            self.kill()
1✔
175
            on_error(
1✔
176
                "{process} TIMEOUT\n{stderr}",
177
                process=self.name,
178
                stderr=list(self.stderr),
179
            )
180
        if self.returncode != 0:
1✔
181
            on_error(
1✔
182
                "{process} FAIL: returncode={code|quote}\n{stderr}",
183
                process=self.name,
184
                code=self.service.returncode,
185
                stderr=list(self.stderr),
186
            )
187
        return self
1✔
188

189
    def remove_child(self, child):
1✔
190
        pass
×
191

192
    @property
1✔
193
    def pid(self):
1✔
194
        return self.service.pid
1✔
195

196
    @property
1✔
197
    def returncode(self):
1✔
198
        return self.service.returncode
1✔
199

200
    def _monitor(self, please_stop):
1✔
201
        with Timer(self.name, verbose=self.debug):
1✔
202
            while not please_stop:
1✔
203
                now = unix_now()
1✔
204
                last_out = max(self.stdout_status.last_read, self.stderr_status.last_read)
1✔
205
                took = now - last_out
1✔
206
                if took > self.timeout:
1✔
207
                    self.kill_once()
×
208
                    logger.warning(
×
209
                        "{last_sent} for {name} last used {last_used} took over {timeout} seconds to respond",
210
                        last_sent=self.second_last_stdin,
211
                        last_used=Date(last_out).format(),
212
                        timeout=self.timeout,
213
                        name=self.name,
214
                    )
215
                    break
×
216
                try:
1✔
217
                    self.service.wait(timeout=self.monitor_period)
1✔
218
                    if self.service.returncode is not None:
1✔
219
                        break
1✔
220
                    self.debug and logger.info("{name} waiting for response", name=self.name)
×
221
                except Exception:
1✔
222
                    # TIMEOUT, CHECK FOR LIVELINESS
223
                    pass
1✔
224

225
        (stdin_thread, stdout_thread, stderr_thread, _), self.children = self.children, ()
1✔
226

227
        # stdout can lock up in windows, so do not wait too long
228
        wait_limit = Till(seconds=1)
1✔
229
        try:
1✔
230
            stdout_thread.join(till=wait_limit)
1✔
231
        except:
1✔
232
            # THREAD LOST ON PIPE.readline()
233
            self.stdout.close()
1✔
234
            stdout_thread.release()
1✔
235
            stdout_thread.end_of_thread = EndOfThread(None, None)
1✔
236
            with ALL_LOCK:
1✔
237
                if stdout_thread.ident in ALL:
1✔
238
                    del ALL[stdout_thread.ident]
1✔
239
            stdout_thread.stopped.go()
1✔
240

241
        try:
1✔
242
            stderr_thread.join(till=wait_limit)
1✔
243
        except:
1✔
244
            # THREAD LOST ON PIPE.readline()
245
            self.stderr.close()
1✔
246
            stderr_thread.release()
1✔
247
            stderr_thread.end_of_thread = EndOfThread(None, None)
1✔
248
            with ALL_LOCK:
1✔
249
                if stderr_thread.ident in ALL:
1✔
250
                    del ALL[stderr_thread.ident]
1✔
251
            stderr_thread.stopped.go()
1✔
252

253
        self.stdin.close()
1✔
254
        stdin_thread.join()
1✔
255

256
        self.stopped.go()
1✔
257
        self.debug and logger.info(
1✔
258
            "{process} STOP: returncode={returncode}", process=self.name, returncode=self.service.returncode,
259
        )
260

261
    def _reader(self, name, pipe, receive, status: Status, please_stop):
1✔
262
        """
263
        MOVE LINES FROM pipe TO receive QUEUE
264
        """
265
        self.debug and logger.info("is reading")
1✔
266
        try:
1✔
267
            while not please_stop and self.service.returncode is None:
1✔
268
                data = pipe.readline()  # THIS MAY NEVER RETURN
1✔
269
                status.last_read = unix_now()
1✔
270
                line = data.decode("utf8").rstrip()
1✔
271
                self.debug and logger.info("got line: {line}", line=line)
1✔
272
                if not data:
1✔
273
                    break
1✔
274
                receive.add(line)
1✔
275
        except Exception as cause:
276
            if not please_stop:
277
                logger.warning("premature read failure", cause=cause)
278
        finally:
279
            self.debug and logger.info("{name} closed", name=name)
1✔
280
            self.please_stop.go()
1✔
281
            receive.close()
1✔
282
            pipe.close()
1✔
283

284
    def _writer(self, pipe, send, please_stop):
1✔
285
        while not please_stop:
1✔
286
            line = send.pop(till=please_stop)
1✔
287
            if line is PLEASE_STOP:
1✔
288
                please_stop.go()
×
289
                self.debug and logger.info("got PLEASE_STOP")
×
290
                break
×
291
            elif line is None:
1✔
292
                continue
1✔
293
            self.second_last_stdin = self.last_stdin
1✔
294
            self.last_stdin = line
1✔
295
            self.debug and logger.info(
1✔
296
                "send line: {line}", process=self.name, line=line.rstrip(),
297
            )
298
            try:
1✔
299
                pipe.write(line.encode("utf8"))
1✔
300
                pipe.write(EOL)
1✔
301
                pipe.flush()
1✔
302
            except Exception as cause:
303
                # HAPPENS WHEN PROCESS IS DONE
304
                self.debug and logger.info("pipe closed")
305
                break
306
        self.debug and logger.info("writer closed")
1✔
307

308
    def kill(self):
1✔
309
        self.kill_once = Null
1✔
310
        try:
1✔
311
            if self.service.returncode is not None:
1✔
312
                return
×
313
            self.service.kill()
1✔
314
            logger.info("{process} was successfully terminated.", process=self.name, stack_depth=1)
1✔
315
        except Exception as cause:
316
            cause = Except.wrap(cause)
317
            if "The operation completed successfully" in cause:
318
                return
319
            if "No such process" in cause:
320
                return
321

322
            logger.warning(
323
                "Failure to kill process {process|quote}", process=self.name, cause=cause,
324
            )
325

326

327
if is_windows:
1✔
328
    EOL = b"\r\n"
×
329
else:
330
    EOL = b"\n"
1✔
331

332

333
def os_path(path):
1✔
334
    """
335
    :return: OS-specific path
336
    """
337
    if path == None:
1✔
338
        return None
1✔
339
    if os.sep == "/":
1✔
340
        return path
1✔
341
    return str(path).lstrip("/")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc