• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

klahnakoski / mo-threads / 13092289143

01 Feb 2025 08:46PM UTC coverage: 66.056% (-0.3%) from 66.318%
13092289143

push

github

klahnakoski
update lockfile

1263 of 1912 relevant lines covered (66.06%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/mo_threads/python.py
1
# encoding: utf-8
2
#
3
# This Source Code Form is subject to the terms of the Mozilla Public
4
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
5
# You can obtain one at http://mozilla.org/MPL/2.0/.
6
#
7
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
8
#
9

10

11
import os
1✔
12
import sys
1✔
13
from json import dumps as value2json, loads as json2value
1✔
14

15
from mo_dots import to_data, from_data
1✔
16
from mo_future import is_windows
1✔
17
from mo_logs import Except, logger
1✔
18
from mo_threads import Lock, Process, Signal, PLEASE_STOP, Thread, DONE, python_worker
1✔
19

20
DEBUG = False
1✔
21

22

23
class Python(object):
1✔
24
    def __init__(self, name, config, parent_thread=None):
1✔
25
        python_exe = sys.executable
1✔
26
        config = to_data(config)
1✔
27
        if config.debug.logs:
1✔
28
            logger.error("not allowed to configure logging on other process")
×
29

30
        logger.info("begin process in dir={dir}", dir=os.getcwd())
1✔
31
        # WINDOWS REQUIRED shell, WHILE LINUX NOT
32
        shell = is_windows
1✔
33
        python_worker_file = os.path.abspath(python_worker.__file__)
1✔
34
        self.process = Process(
1✔
35
            name,
36
            [python_exe, "-u", python_worker_file],
37
            env={**os.environ, "PYTHONPATH": "."},
38
            debug=DEBUG,
39
            cwd=os.getcwd(),
40
            shell=shell,
41
        )
42
        self.process.stdin.add(value2json(from_data(
1✔
43
            config
44
            | {
45
                "debug": {"trace": True},
46
                "constants": {"mo_threads": {"signals": {"DEBUG": False}, "lock": {"DEBUG": False}}},
47
            }
48
        )))
49
        while True:
1✔
50
            line = self.process.stdout.pop()
1✔
51
            if line == PLEASE_STOP:
1✔
52
                logger.error("problem starting python process: STOP detected on stdout")
×
53
            if line == '{"out":"ok"}':
1✔
54
                break
1✔
55
            logger.info("waiting to start python: {line}", line=line)
×
56
        self.lock = Lock(f"wait for response from {name}")
1✔
57
        self.stop_error = None
1✔
58
        self.done = DONE
1✔
59
        self.response = None
1✔
60
        self.error = None
1✔
61

62
        self.watch_stdout = Thread.run(f"watching stdout for {name}", self._watch_stdout)
1✔
63
        self.watch_stderr = Thread.run(f"watching stderr for {name}", self._watch_stderr)
1✔
64

65
    def _execute(self, command):
1✔
66
        while True:
1✔
67
            self.done.wait()
1✔
68
            with self.lock:
1✔
69
                if self.done:
1✔
70
                    self.done = Signal()
1✔
71
                    break
1✔
72

73
        self.response = None
1✔
74
        self.error = None
1✔
75
        self.process.stdin.add(value2json(command), force=True)
1✔
76
        self.done.wait()
1✔
77
        try:
1✔
78
            if self.error:
1✔
79
                logger.error("problem with process call", cause=Except(**self.error))
×
80
            else:
81
                return self.response
1✔
82
        finally:
83
            self.response = None
1✔
84
            self.error = None
1✔
85

86
    def _watch_stdout(self, please_stop):
1✔
87
        while not please_stop:
1✔
88
            line = self.process.stdout.pop(till=please_stop)
1✔
89
            DEBUG and logger.info("stdout got {line}", line=line)
90
            if line == PLEASE_STOP:
1✔
91
                please_stop.go()
×
92
                break
×
93
            elif not line:
1✔
94
                continue
1✔
95

96
            try:
1✔
97
                data = to_data(json2value(line))
1✔
98
            except Exception:
×
99
                logger.info("non-json line: {line}", line=line)
×
100
                continue
×
101

102
            try:
1✔
103
                if "log" in data:
1✔
104
                    logger.main_log.write(**data.log)
×
105
                elif "out" in data:
1✔
106
                    self.response = data.out
1✔
107
                    self.done.go()
1✔
108
                elif "err" in data:
×
109
                    self.error = data.err
×
110
                    self.done.go()
×
111
            except Exception as cause:
112
                logger.error("unexpected problem", cause=cause)
113
        DEBUG and logger.info("stdout reader is done")
114

115
    def _watch_stderr(self, please_stop):
1✔
116
        while not please_stop:
1✔
117
            try:
1✔
118
                line = self.process.stderr.pop(till=please_stop)
1✔
119
                DEBUG and logger.info("stderr got {line}", line=line)
120
                if line is None or line == PLEASE_STOP:
1✔
121
                    please_stop.go()
1✔
122
                    break
1✔
123
                logger.info(
×
124
                    "Error line from {name}({pid}): {line}", line=line, name=self.process.name, pid=self.process.pid,
125
                )
126
            except Exception as cause:
127
                logger.error("could not process line", cause=cause)
128
        DEBUG and logger.info("stderr reader is done")
129

130
    def import_module(self, module_name, var_names=None):
1✔
131
        if var_names is None:
1✔
132
            self._execute({"import": module_name})
1✔
133
        else:
134
            self._execute({"import": {"from": module_name, "vars": var_names}})
×
135

136
    def set(self, var_name, value):
1✔
137
        self._execute({"set": {var_name, value}})
×
138

139
    def get(self, var_name):
1✔
140
        return self._execute({"get": var_name})
1✔
141

142
    def execute_script(self, script):
1✔
143
        return self._execute({"exec": script})
1✔
144

145
    def __getattr__(self, item):
1✔
146
        def output(*args, **kwargs):
1✔
147
            if len(args):
1✔
148
                if kwargs.keys():
1✔
149
                    logger.error("Not allowed to use both args and kwargs")
×
150
                return self._execute({item: args})
1✔
151
            else:
152
                return self._execute({item: kwargs})
×
153

154
        return output
1✔
155

156
    def stop(self):
1✔
157
        try:
1✔
158
            self._execute({"stop": {}})
1✔
159
            self.process.stop()
1✔
160
            self.watch_stdout.stop()
1✔
161
            self.watch_stderr.stop()
1✔
162
        except Exception as cause:
163
            self.stop_error = cause
164
        return self
1✔
165

166
    def join(self):
1✔
167
        if self.stop_error:
×
168
            logger.error("problem with stop", cause=self.stop_error)
×
169

170
        self.process.join()
×
171
        self.watch_stdout.join()
×
172
        self.watch_stderr.join()
×
173
        return self
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc