• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

klahnakoski / mo-threads / 19750076942

27 Nov 2025 11:42PM UTC coverage: 66.231% (+0.2%) from 66.074%
19750076942

push

github

klahnakoski
update lockfile

1267 of 1913 relevant lines covered (66.23%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.19
/mo_threads/threads.py
1
# encoding: utf-8
2
#
3
#
4
# This Source Code Form is subject to the terms of the Mozilla Public
5
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
6
# You can obtain one at https://www.mozilla.org/en-US/MPL/2.0/.
7
#
8
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
9
#
10
# THIS THREADING MODULE IS PERMEATED BY THE please_stop SIGNAL.
11
# THIS SIGNAL IS IMPORTANT FOR PROPER SIGNALLING WHICH ALLOWS
12
# FOR FAST AND PREDICTABLE SHUTDOWN AND CLEANUP OF THREADS
13

14
import signal as _signal
1✔
15
import sys
1✔
16
import threading
1✔
17
import traceback
1✔
18
from collections import namedtuple
1✔
19
from datetime import datetime, timedelta
1✔
20
from time import sleep
1✔
21

22
from mo_dots import unwraplist, Null, exists
1✔
23
from mo_future import (
1✔
24
    allocate_lock,
25
    get_function_name,
26
    get_ident,
27
    decorate,
28
)
29
from mo_imports import export
1✔
30
from mo_logs import Except, logger, raise_from_none
1✔
31
from mo_logs.exceptions import ERROR
1✔
32
from mo_logs.strings import expand_template
1✔
33

34
from mo_threads.signals import AndSignals, Signal
1✔
35
from mo_threads.till import Till, TIMERS_NAME
1✔
36

37
try:
1✔
38
    # TODO: replace with simple import
39
    from mo_logs import MO_LOGS_EXTRAS
1✔
40
except:
×
41
    MO_LOGS_EXTRAS = "mo-logs-extras"
×
42

43
DEBUG = False
1✔
44
KNOWN_DEBUGGERS = ["pydevd.py"]
1✔
45

46
PLEASE_STOP = "please_stop"  # REQUIRED thread PARAMETER TO SIGNAL STOP
1✔
47
PARENT_THREAD = "parent_thread"  # OPTIONAL PARAMETER TO ASSIGN THREAD TO SOMETHING OTHER THAN CURRENT THREAD
1✔
48
MAX_DATETIME = datetime(2286, 11, 20, 17, 46, 39)
1✔
49
DEFAULT_WAIT_TIME = timedelta(minutes=10)
1✔
50
THREAD_TIMEOUT = "Thread {name} timeout"
1✔
51
COVERAGE_COLLECTOR = None  # Detect Coverage.py
1✔
52

53
datetime.strptime("2012-01-01", "%Y-%m-%d")  # http://bugs.python.org/issue7980
1✔
54

55
cprofiler_stats = None  # ACCUMULATION OF STATS FROM ALL THREADS
1✔
56

57
try:
1✔
58
    STDOUT = sys.stdout.buffer
1✔
59
    STDERR = sys.stderr.buffer
1✔
60
    STDIN = sys.stdin.buffer
1✔
61
except Exception:
×
62
    STDOUT = sys.stdout
×
63
    STDERR = sys.stderr
×
64
    STDIN = sys.stdin
×
65

66
if sys.version_info[1] > 9:
1✔
67

68
    def is_daemon(thread):
1✔
69
        return thread.daemon
1✔
70

71

72
else:
73

74
    def is_daemon(thread):
×
75
        return thread.isDaemon()
×
76

77

78
IN_DEBUGGER = any(debugger in line.filename for line in traceback.extract_stack() for debugger in KNOWN_DEBUGGERS)
1✔
79

80

81
class BaseThread:
1✔
82
    __slots__ = [
1✔
83
        "_ident",
84
        "threading_thread",
85
        "child_locker",
86
        "children",
87
        "parent",
88
        "cprofiler",
89
    ]
90

91
    def __init__(self, ident, threading_thread, name=None):
1✔
92
        if threading_thread.name.startswith("Dummy"):
1✔
93
            threading_thread.name = name or f"Unknown Thread {ident}"
×
94
        self._ident = ident
1✔
95
        self.threading_thread = threading_thread
1✔
96
        self.child_locker = allocate_lock()
1✔
97
        self.children = []
1✔
98
        self.parent = MAIN_THREAD
1✔
99
        self.cprofiler = None
1✔
100

101
    @property
1✔
102
    def ident(self):
1✔
103
        return self._ident
1✔
104

105
    @property
1✔
106
    def id(self):
1✔
107
        return self._ident
×
108

109
    @property
1✔
110
    def name(self):
1✔
111
        return self.threading_thread.name
1✔
112

113
    def add_child(self, child):
1✔
114
        if DEBUG:
115
            if child.name == TIMERS_NAME:
116
                logger.error("timer thread should not be added as child")
117
            if child is self:
118
                logger.error("parent thread should not be added as child")
119
            logger.info("adding child {child} to {parent}", child=child.name, parent=self.name)
120

121
        with self.child_locker:
1✔
122
            self.children.append(child)
1✔
123

124
    def remove_child(self, child):
1✔
125
        try:
1✔
126
            with self.child_locker:
1✔
127
                self.children.remove(child)
1✔
128
        except Exception:
1✔
129
            pass
1✔
130

131
    def stop(self):
1✔
132
        pass
1✔
133

134
    def join(self, till=None):
1✔
135
        DEBUG and logger.info("{thread} joining on thread {name|quote}", name=self.name)
136
        thread = self.threading_thread
1✔
137
        try:
1✔
138
            if not is_daemon(thread):
1✔
139
                while not till and thread.is_alive():
1✔
140
                    thread.join(1.0)
×
141
                    sys.stderr.write(f"waiting for {thread.name}")
×
142
        except Exception as cause:
143
            logger.error(thread.name, cause=cause)
144
        finally:
145
            try:
1✔
146
                self.parent.remove_child(self)
1✔
147
            except Exception as cause:
148
                logger.warning("parents of children must have remove_child() method", cause=cause)
149

150
    def __repr__(self):
1✔
151
        return f"BaseThread({self.name})"
×
152

153

154
class AlienThread(BaseThread):
1✔
155
    def __init__(self, ident, threading_thread):
1✔
156
        BaseThread.__init__(self, ident, threading_thread)
1✔
157

158
        if DEBUG:
159
            with ALL_LOCK:
160
                ALL[ident] = self
161
                ALL_THREAD.append(self)
162
                problem = ident in ALL
163
            logger.info("adding {name} ({id}) to ALL", id=self.ident, name=self.name)
164
            if problem:
165
                logger.error("Thread {name|quote} ({id}) already registered", id=self.ident, name=self.name)
166
        else:
167
            with ALL_LOCK:
1✔
168
                ALL[ident] = self
1✔
169

170
    def join(self, till=None):
1✔
171
        try:
1✔
172
            BaseThread.join(self, till=till)
1✔
173
        finally:
174
            if DEBUG:
175
                logger.info("removing {name} ({id}) from ALL", id=self._ident, name=self.name)
176
            with ALL_LOCK:
1✔
177
                if self._ident in ALL:
1✔
178
                    del ALL[self._ident]
1✔
179

180

181
class MainThread(BaseThread):
1✔
182
    def __init__(self):
1✔
183
        BaseThread.__init__(self, get_ident(), threading.current_thread(), "Main Thread")
1✔
184
        self.please_stop = Signal()
1✔
185
        self.stopped = Signal()
1✔
186
        self.stop_logging = logger.stop
1✔
187
        self.timers = None
1✔
188
        self.shutdown_locker = allocate_lock()
1✔
189

190
    def stop(self):
1✔
191
        """
192
        BLOCKS UNTIL ALL KNOWN THREADS, EXCEPT MainThread, HAVE STOPPED
193
        """
194
        self_thread = current_thread()
1✔
195
        if self_thread != MAIN_THREAD:
1✔
196
            logger.error("Only the main thread can call stop()")
×
197
        if self_thread != self:
1✔
198
            logger.error("Only the current thread can call stop()")
×
199

200
        DEBUG and logger.info("Stopping MainThread")
201
        if self.stopped:
1✔
202
            return
×
203

204
        self.please_stop.go()
1✔
205

206
        with self.child_locker:
1✔
207
            children = list(self.children)
1✔
208
        for c in reversed(children):
1✔
209
            DEBUG and c.name and logger.info("Stopping thread {name|quote}", name=c.name)
210
            c.stop()
1✔
211

212
        join_errors = None
1✔
213
        try:
1✔
214
            join_all_threads(children)
1✔
215
        except Exception as cause:
216
            join_errors = Except.wrap(cause)
217
            # REPORT ERRORS BEFORE LOGGING SHUTDOWN
218
            logger.warning("Problem while stopping {name|quote}", name=self.name, cause=cause, log_context=ERROR)
219
        DEBUG and logger.info("All children stopped")
220

221
        with self.shutdown_locker:
1✔
222
            if self.stopped:
1✔
223
                return
×
224
            self.stop_logging()
1✔
225
            self.timers.stop().join()
1✔
226

227
            if cprofiler_stats is not None:
1✔
228
                from mo_threads.profiles import write_profiles
×
229

230
                write_profiles(self.cprofiler)
×
231
            DEBUG and logger.info("Thread {name|quote} now stopped", name=self.name)
232

233
        if DEBUG:
234
            logger.info("MainThread removing {name} ({id}) from ALL", id=self.ident, name=self.name)
235
        with ALL_LOCK:
1✔
236
            if self.ident in ALL:
1✔
237
                del ALL[self.ident]
1✔
238
            residue = list(ALL.values())
1✔
239

240
        DEBUG and logger.info("MainThread has {count} threads left", count=[t.name for t in threading.enumerate()])
241

242
        if residue and (DEBUG or not IN_DEBUGGER):
1✔
243
            sys.stderr.write("Expecting no further threads: " + ", ".join(f"{t.name} ({t.ident})" for t in residue))
×
244
        for t in residue:
1✔
245
            t.stop()
×
246
        join_all_threads(residue)
1✔
247

248
        if join_errors:
1✔
249
            raise Except(
1✔
250
                context=ERROR,
251
                template="Problem while stopping {name|quote}",
252
                name=self.name,
253
                cause=unwraplist(join_errors),
254
            )
255

256
        return self
1✔
257

258

259
EndOfThread = namedtuple("EndOfThread", ["response", "exception"])
1✔
260

261

262
class Thread(BaseThread):
1✔
263
    """
264
    join() ENHANCED TO ALLOW CAPTURE OF CTRL-C, AND RETURN POSSIBLE THREAD EXCEPTIONS
265
    run() ENHANCED TO CAPTURE EXCEPTIONS
266
    """
267

268
    def __init__(self, name, target, *args, parent_thread=None, daemon=False, **kwargs):
1✔
269
        name = str(name)
1✔
270
        threading_thread = threading.Thread(None, self._run, name, daemon=daemon)
1✔
271
        BaseThread.__init__(self, 0, threading_thread, name or f"thread_{object.__hash__(self)}")
1✔
272
        self.target = target
1✔
273
        self.end_of_thread = None
1✔
274
        self.args = args
1✔
275

276
        # ENSURE THERE IS A SHARED please_stop SIGNAL
277
        self.kwargs = dict(kwargs)
1✔
278
        if PLEASE_STOP in self.kwargs:
1✔
279
            self.please_stop = self.kwargs[PLEASE_STOP]
1✔
280
        else:
281
            self.please_stop = self.kwargs[PLEASE_STOP] = Signal(f"please_stop for {self.name}")
1✔
282
        self.joiner_is_waiting = Signal(f"joining with {self.name}")
1✔
283
        self.stopped = Signal(f"stopped signal for {self.name}")
1✔
284
        if parent_thread is None:
1✔
285
            parent_thread = current_thread()
1✔
286
        self.parent = parent_thread
1✔
287
        self.parent.add_child(self)
1✔
288

289
        try:
1✔
290
            # INHERIT LOGGING EXTRAS
291
            if exists(parent_thread):
1✔
292
                setattr(
1✔
293
                    self.threading_thread,
294
                    MO_LOGS_EXTRAS,
295
                    [getattr(parent_thread.threading_thread, MO_LOGS_EXTRAS)[-1]],
296
                )
297
        except:
1✔
298
            pass
1✔
299

300
    def __enter__(self):
1✔
301
        return self
×
302

303
    def __exit__(self, type, value, traceback):
1✔
304
        if isinstance(type, BaseException):
×
305
            self.please_stop.go()
×
306

307
        # TODO: AFTER A WHILE START KILLING THREAD
308
        self.join()
×
309
        self.args = None
×
310
        self.kwargs = None
×
311

312
    def start(self):
1✔
313
        try:
1✔
314
            self.threading_thread.start()
1✔
315
            return self
1✔
316
        except Exception as cause:
317
            logger.error("Can not start thread", cause)
318

319
    def stop(self):
1✔
320
        """
321
        SEND STOP SIGNAL, DO NOT BLOCK
322
        """
323
        with self.child_locker:
1✔
324
            children = list(self.children)
1✔
325
        for c in children:
1✔
326
            DEBUG and c.name and logger.note("Stopping thread {name|quote}", name=c.name)
327
            c.stop()
1✔
328
        self.please_stop.go()
1✔
329

330
        DEBUG and logger.note(
331
            "Thread {name|quote} got request to stop from {thread}", thread=current_thread().name, name=self.name
332
        )
333
        return self
1✔
334

335
    def is_alive(self):
1✔
336
        return not self.stopped
×
337

338
    def _run(self):
1✔
339
        self._ident = ident = get_ident()
1✔
340
        with RegisterThread(thread=self):
1✔
341
            with ALL_LOCK:
×
342
                ALL[ident] = self
×
343
            try:  # deal with join
×
344
                try:  # deal with target exceptions
×
345
                    if self.target is not None:
×
346
                        args, kwargs, self.args, self.kwargs = self.args, self.kwargs, None, None
×
347
                        self.end_of_thread = EndOfThread(self.target(*args, **kwargs), None)
×
348
                except Exception as cause:
349
                    cause = Except.wrap(cause)
350
                    self.end_of_thread = EndOfThread(None, cause)
351
                    with self.parent.child_locker:
352
                        emit_problem = self not in self.parent.children
353
                    if emit_problem:
354
                        # THREAD FAILURES ARE A PROBLEM ONLY IF NO ONE WILL BE JOINING WITH IT
355
                        try:
356
                            logger.error("Problem in thread {name|quote}", name=self.name, cause=cause)
357
                        except Exception as cause:
358
                            sys.stderr.write(f"ERROR in thread: {self.name}  {cause}\n")
359
                finally:
360
                    with self.child_locker:
×
361
                        children, self.children = list(self.children), []
×
362
                    try:
×
363
                        for c in children:
×
364
                            DEBUG and logger.note(f"Stopping thread {c.name}\n")
365
                            c.stop()
×
366

367
                        join_all_threads(children)
×
368
                        del self.target, self.args, self.kwargs
×
369
                        DEBUG and logger.note("thread {name|quote} stopping", name=self.name)
370
                    except Exception as cause:
371
                        DEBUG and logger.warning("problem with thread {name|quote}", cause=cause, name=self.name)
372
                    finally:
373
                        DEBUG and logger.note("thread {name|quote} remove from ALL", name=self.name)
374
                        with ALL_LOCK:
×
375
                            if ident in ALL:
×
376
                                del ALL[ident]
×
377
                        # FROM NOW ON, USING logger WILL RE-REGISTER THIS THREAD
378
                        self.stopped.go()
×
379
            finally:
380
                DEBUG and safe_info("thread {name|quote} is done, wait for join", name=self.name)
381
                # WHERE DO WE PUT THE THREAD RESULT?
382
                # IF NO THREAD JOINS WITH THIS, THEN WHAT DO WE DO WITH THE RESULT?
383
                # HOW LONG DO WE WAIT FOR ANOTHER TO ACCEPT THE RESULT?
384
                #
385
                # WAIT 60seconds, THEN SEND RESULT TO LOGGER
386
                (Till(seconds=60) | self.joiner_is_waiting).wait()
×
387
                if self.joiner_is_waiting:
×
388
                    return
×
389
                res, exp = self.end_of_thread
×
390
                if exp:
×
391
                    # THREAD FAILURES ARE A PROBLEM ONLY IF NO ONE WILL BE JOINING WITH IT
392
                    try:
×
393
                        logger.error(
×
394
                            "Problem in thread {name|quote}", name=self.name, cause=self.end_of_thread.exception,
395
                        )
396
                    except Exception as cause:
397
                        sys.stderr.write(f"ERROR in thread: {self.name} {cause}\n")
398
                        return
399

400
                if res is not None:
×
401
                    logger.warning(
×
402
                        "Thread {thread} returned a response {response|json}, but was not joined",
403
                        thread=self.name,
404
                        response=res,
405
                    )
406
                # THREAD ENDS OK; FORGET ABOUT IT
407
                if isinstance(self.parent, Thread):
×
408
                    # SOMETIMES parent IS NOT A THREAD
409
                    self.parent.remove_child(self)
×
410

411
    def release(self):
1✔
412
        """
413
        RELEASE THREAD TO FEND FOR ITSELF. THREAD CAN EXPECT TO NEVER
414
        JOIN. WILL SEND RESULTS TO LOGS WHEN DONE.
415

416
        PARENT THREAD WILL STILL ENSURE self HAS STOPPED PROPERLY
417
        """
418
        self.joiner_is_waiting.go()
1✔
419
        return self
1✔
420

421
    def join(self, till=None):
1✔
422
        """
423
        RETURN THE RESULT {"response":r, "exception":e} OF THE THREAD EXECUTION (INCLUDING EXCEPTION, IF EXISTS)
424
        """
425
        if self is Thread:
1✔
426
            logger.error("Thread.join() is not a valid call, use t.join()")
×
427

428
        if isinstance(till, (int, float)):
1✔
429
            till = Till(seconds=till)
×
430

431
        with self.child_locker:
1✔
432
            children = list(self.children)
1✔
433
        join_all_threads(children, till=till)
1✔
434

435
        DEBUG and logger.note(
436
            "{parent.name} ({parent.ident}) waiting on thread {child}", parent=current_thread(), child=self.name,
437
        )
438
        self.joiner_is_waiting.go()
1✔
439
        self.stopped.wait(till=till)
1✔
440
        if not self.stopped:
1✔
441
            raise Except(ERROR, template=THREAD_TIMEOUT, params={"name": self.name})
1✔
442
        DEBUG and logger.note(
443
            "{parent.name} ({parent.ident}) DONE waiting on thread {child}", parent=current_thread(), child=self.name,
444
        )
445

446
        try:
1✔
447
            self.parent.remove_child(self)
1✔
448
        except Exception as cause:
449
            logger.warning("parents of children must have remove_child() method", cause=cause)
450

451
        if self.end_of_thread.exception:
1✔
452
            logger.error(
1✔
453
                "Thread {name|quote} did not end well", name=self.name, cause=self.end_of_thread.exception,
454
            )
455
        return self.end_of_thread.response
1✔
456

457
    @staticmethod
1✔
458
    def run(name, target, *args, **kwargs):
1✔
459
        # ENSURE target HAS please_stop ARGUMENT
460
        if get_function_name(target) == "wrapper":
1✔
461
            pass  # GIVE THE override DECORATOR A PASS
×
462
        elif PLEASE_STOP not in target.__code__.co_varnames:
1✔
463
            logger.error("function must have please_stop argument for signalling shutdown")
×
464

465
        return Thread(name, target, *args, **kwargs).start()
1✔
466

467
    @staticmethod
1✔
468
    def current():
1✔
469
        return current_thread()
1✔
470

471
    @staticmethod
1✔
472
    def join_all(threads):
1✔
473
        join_all_threads(threads)
×
474

475
    ####################################################################################################################
476
    ## threading.Thread METHODS
477
    ####################################################################################################################
478
    def is_alive(self):
1✔
479
        return not self.stopped
×
480

481
    @property
1✔
482
    def _is_stopped(self):
1✔
483
        return self.stopped
×
484

485
    @property
1✔
486
    def daemon(self):
1✔
487
        return False
×
488

489
    def isDaemon(self):
1✔
490
        return False
×
491

492
    def getName(self):
1✔
493
        return self.name
×
494

495
    def setName(self, name):
1✔
496
        self.name = name
×
497

498
    def __repr__(self):
1✔
499
        return f"Thread({self.name})"
×
500

501

502
class RegisterThread:
1✔
503
    """
504
    A context manager to handle threads spawned by other libs
505
    This will ensure the thread has unregistered, or
506
    has completed before MAIN_THREAD is shutdown
507
    """
508

509
    __slots__ = ["thread"]
1✔
510

511
    def __init__(self, *, thread=None, name=None):
1✔
512
        self.thread = thread or BaseThread(get_ident(), threading.current_thread(), name)
1✔
513

514
    def __enter__(self):
1✔
515
        thread = self.thread
1✔
516
        if cprofiler_stats is not None:
1✔
517
            from mo_threads.profiles import CProfiler
×
518

519
            cprofiler = thread.cprofiler = CProfiler()
×
520
            cprofiler.__enter__()
×
521
        if COVERAGE_COLLECTOR is not None:
1✔
522
            # STARTING TRACER WILL sys.settrace() ITSELF
523
            COVERAGE_COLLECTOR._collectors[-1]._start_tracer()
1✔
524

525
        return self
×
526

527
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
528
        # PYTHON WILL REMOVE GLOBAL VAR BEFORE END-OF-THREAD
529
        thread = self.thread
1✔
530

531
        if cprofiler_stats is not None:
1✔
532
            thread.cprofiler.__exit__(exc_type, exc_val, exc_tb)
×
533
        with thread.child_locker:
1✔
534
            if thread.children:
1✔
535
                logger.error(
×
536
                    "Thread {thread|quote} has not joined with child threads {children|json}",
537
                    children=[c.name for c in thread.children],
538
                    thread=thread.name,
539
                )
540
        DEBUG and safe_info("deregistered {name}", name=thread.name)
541

542

543
def register_thread(func):
1✔
544
    """
545
    Call `with RegisterThread():`
546
    Track this thread to ensure controlled shutdown
547
    """
548

549
    @decorate(func)
×
550
    def output(*args, **kwargs):
×
551
        with RegisterThread():
×
552
            return func(*args, **kwargs)
×
553

554
    return output
×
555

556

557
def _poll_for_exit(please_stop):
1✔
558
    """
559
    /dev/null PIPED TO sys.stdin SPEWS INFINITE LINES, DO NOT POLL AS OFTEN
560
    """
561
    cr_count = 0  # COUNT NUMBER OF BLANK LINES
×
562

563
    try:
×
564
        while not please_stop:
×
565
            # DEBUG and logger.note("inside wait-for-shutdown loop")
566
            if cr_count > 30:
×
567
                (Till(seconds=3) | please_stop).wait()
×
568
            try:
×
569
                line = STDIN.readline()
×
570
            except Exception as cause:
571
                Except.wrap(cause)
572
                if "Bad file descriptor" in cause:
573
                    logger.note("can not read from stdin")
574
                    _wait_for_interrupt(please_stop)
575
                    break
576

577
            # DEBUG and logger.note("read line {line|quote}, count={count}", line=line, count=cr_count)
578
            if not line:
×
579
                cr_count += 1
×
580
            else:
581
                cr_count = -1000000  # NOT /dev/null
×
582

583
            if line.strip() == b"exit":
×
584
                logger.alert("'exit' Detected!  Sending request to stop")
×
585
                please_stop.go()
×
586
                return
×
587
    except Exception as cause:
588
        logger.warning("programming error", cause=cause)
589
    finally:
590
        if please_stop:
×
591
            logger.note("please_stop has been requested")
×
592
        logger.note("done waiting for exit")
×
593

594

595
def current_thread():
1✔
596
    ident = get_ident()
1✔
597
    all_lock = ALL_LOCK
1✔
598
    all = ALL
1✔
599
    main_thread = MAIN_THREAD
1✔
600

601
    with all_lock:
1✔
602
        output = all.get(ident)
1✔
603

604
    if output is None:
1✔
605
        threading_thread = threading.current_thread()
1✔
606
        if threading_thread is main_thread.threading_thread:
1✔
607
            return main_thread
×
608

609
        thread = AlienThread(ident, threading_thread)
1✔
610
        if cprofiler_stats is not None:
1✔
611
            from mo_threads.profiles import CProfiler
×
612

613
            thread.cprofiler = CProfiler()
×
614
            thread.cprofiler.__enter__()
×
615
        main_thread.add_child(thread)
1✔
616
        with all_lock:
1✔
617
            all[ident] = thread
1✔
618

619
        logger.warning(
1✔
620
            "this thread is not known. Register {name|quote} at earliest known entry point.", name=thread.name
621
        )
622
        return thread
1✔
623
    return output
1✔
624

625

626
def safe_info(template, **params):
1✔
627
    print(expand_template(template, params))
×
628

629

630
def join_all_threads(threads, till=None):
1✔
631
    """
632
    Join all threads, and raise any exceptions
633
    :param threads: list of threads to join
634
    :param till: signal to stop waiting for threads
635
    """
636
    threads = list(threads)
1✔
637
    result = [None] * len(threads)
1✔
638
    causes = []
1✔
639
    for i, c in enumerate(threads):
1✔
640
        try:
1✔
641
            DEBUG and logger.note("{parent} joining on thread {name}", parent=current_thread().name, name=c.name)
642
            result[i] = c.join(till=till)
1✔
643
        except Exception as cause:
644
            causes.append(cause)
645
        finally:
646
            DEBUG and logger.note("Joined on thread {name}", name=c.name)
647
    if causes:
1✔
648
        logger.error("At least one thread failed", cause=causes)
1✔
649
    return result
1✔
650

651

652
export("mo_threads.signals", current_thread)
1✔
653

654

655
def _wait_for_interrupt(please_stop):
1✔
656
    DEBUG and logger.note("wait for stop signal")
657
    try:
1✔
658
        # ALTERNATE BETWEEN please_stop CHECK AND SIGINT CHECK
659
        while not please_stop:
1✔
660
            sleep(1)  # LOCKS CAN NOT BE INTERRUPTED, ONLY sleep() CAN
×
661
    finally:
662
        please_stop.go()
1✔
663

664

665
def wait_for_shutdown_signal(
1✔
666
    *,
667
    please_stop=False,  # ASSIGN SIGNAL TO STOP EARLY
668
    allow_exit=False,  # ALLOW "exit" COMMAND ON CONSOLE TO ALSO STOP THE APP
669
    wait_forever=True,  # IGNORE CHILD THREADS, NEVER EXIT.  False => IF NO CHILD THREADS LEFT, THEN EXIT
670
):
671
    """
672
    FOR USE BY PROCESSES THAT NEVER DIE UNLESS EXTERNAL SHUTDOWN IS REQUESTED
673

674
    CALLING THREAD WILL SLEEP UNTIL keyboard interrupt, OR please_stop, OR "exit"
675
    """
676
    main = current_thread()
1✔
677
    if main != MAIN_THREAD:
1✔
678
        logger.error("Only the main thread can sleep forever (waiting for KeyboardInterrupt)")
×
679

680
    if isinstance(please_stop, Signal):
1✔
681
        # MUTUAL SIGNALING MAKES THESE TWO EFFECTIVELY THE SAME SIGNAL
682
        main.please_stop.then(please_stop.go, raise_from_none)
×
683
        please_stop.then(main.please_stop.go)
×
684

685
    if not wait_forever:
1✔
686
        # TRIGGER SIGNAL WHEN ALL CHILDREN THREADS ARE DONE
687
        with main.child_locker:
1✔
688
            pending = list(main.children)
1✔
689
        DEBUG and logger.note("waiting for {children} child threads to finish", children=[c.name for c in pending])
690
        children_done = AndSignals(main.please_stop, len(pending))
1✔
691
        children_done.signal.then(main.please_stop.go)
1✔
692
        for p in pending:
1✔
693
            p.stopped.then(children_done.done)
1✔
694

695
    try:
1✔
696
        if allow_exit:
1✔
697
            threading.Thread(None, _poll_for_exit, args=[main.please_stop], daemon=True).start()
×
698
        _wait_for_interrupt(main.please_stop)
1✔
699
        logger.alert("Stop requested!  Stopping...")
1✔
700
    except KeyboardInterrupt as _:
×
701
        logger.alert("SIGINT Detected!  Stopping...")
×
702
    except SystemExit as _:
×
703
        logger.alert("SIGTERM Detected!  Stopping...")
×
704
    finally:
705
        main.stop()
1✔
706

707

708
def stop_main_thread(signum=0, frame=None, silent=False):
1✔
709
    if not ALL:
1✔
710
        silent or logger.note("All threads have shutdown")
1✔
711
        return
1✔
712

713
    if current_thread() == MAIN_THREAD:
1✔
714
        MAIN_THREAD.stop()
1✔
715
    else:
716
        MAIN_THREAD.please_stop.go()
×
717

718

719
def start_main_thread():
1✔
720
    global MAIN_THREAD
721

722
    try:
1✔
723
        stop_main_thread(silent=True)
1✔
724
    except Exception:
1✔
725
        pass
1✔
726
    MAIN_THREAD = MainThread()
1✔
727
    MAIN_THREAD.shutdown_locker.acquire()
1✔
728

729
    with ALL_LOCK:
1✔
730
        if ALL:
1✔
731
            names = [f"{t.name} ({k})" for k, t in ALL.items()]
×
732
        else:
733
            names = []
1✔
734
            ALL[get_ident()] = MAIN_THREAD
1✔
735

736
    if names:
1✔
737
        if DEBUG:
738
            from mo_files import File
739
            from mo_json import value2json
740

741
            File("all_thread.json").write(value2json([(t.name, t.id) for t in ALL_THREAD], pretty=True))
742
        raise Exception(f"expecting no threads {names}")
×
743

744
    # STARTUP TIMERS
745
    from mo_threads import till
1✔
746

747
    till.enabled = Signal()
1✔
748
    MAIN_THREAD.timers = Thread.run(TIMERS_NAME, till.daemon, parent_thread=Null)
1✔
749
    till.enabled.wait()
1✔
750
    MAIN_THREAD.shutdown_locker.release()
1✔
751

752

753
_signal.signal(_signal.SIGTERM, stop_main_thread)
1✔
754
_signal.signal(_signal.SIGINT, stop_main_thread)
1✔
755
if sys.version_info[:2] < (3, 9):
1✔
756

757
    def wait_for_join():
×
758
        global current_thread
759

760
        threading.main_thread().join()
×
761
        # after main thread exits, we must stop MAIN_THREAD
762
        # spoof the current_thread() to be MAIN_THREAD
763
        current_thread = lambda: MAIN_THREAD
×
764
        stop_main_thread()
×
765

766
    threading.Thread(None, wait_for_join, args=[], daemon=False).start()
×
767
else:
768
    threading._register_atexit(stop_main_thread)
1✔
769

770
MAIN_THREAD = None
1✔
771
ALL_LOCK = allocate_lock()
1✔
772
ALL = dict()
1✔
773
ALL_THREAD = []  # FOR DEBUGGING ONLY
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc