• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

klahnakoski / mo-threads / 13147620664

05 Feb 2025 12:34AM UTC coverage: 66.004% (-0.05%) from 66.056%
13147620664

push

github

klahnakoski
updates from other projects

1262 of 1912 relevant lines covered (66.0%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.19
/mo_threads/threads.py
1
# encoding: utf-8
2
#
3
#
4
# This Source Code Form is subject to the terms of the Mozilla Public
5
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
6
# You can obtain one at https://www.mozilla.org/en-US/MPL/2.0/.
7
#
8
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
9
#
10
# THIS THREADING MODULE IS PERMEATED BY THE please_stop SIGNAL.
11
# THIS SIGNAL IS IMPORTANT FOR PROPER SIGNALLING WHICH ALLOWS
12
# FOR FAST AND PREDICTABLE SHUTDOWN AND CLEANUP OF THREADS
13

14
import signal as _signal
1✔
15
import sys
1✔
16
import threading
1✔
17
import traceback
1✔
18
from collections import namedtuple
1✔
19
from datetime import datetime, timedelta
1✔
20
from time import sleep
1✔
21

22
from mo_dots import unwraplist, Null, exists
1✔
23
from mo_future import (
1✔
24
    allocate_lock,
25
    get_function_name,
26
    get_ident,
27
    decorate,
28
)
29
from mo_imports import export
1✔
30
from mo_logs import Except, logger, raise_from_none
1✔
31
from mo_logs.exceptions import ERROR
1✔
32
from mo_logs.strings import expand_template
1✔
33

34
from mo_threads.signals import AndSignals, Signal
1✔
35
from mo_threads.till import Till, TIMERS_NAME
1✔
36

37
try:
1✔
38
    # TODO: replace with simple import
39
    from mo_logs import MO_LOGS_EXTRAS
1✔
40
except:
×
41
    MO_LOGS_EXTRAS = "mo-logs-extras"
×
42

43
DEBUG = False
1✔
44
KNOWN_DEBUGGERS = ["pydevd.py"]
1✔
45

46
PLEASE_STOP = "please_stop"  # REQUIRED thread PARAMETER TO SIGNAL STOP
1✔
47
PARENT_THREAD = "parent_thread"  # OPTIONAL PARAMETER TO ASSIGN THREAD TO SOMETHING OTHER THAN CURRENT THREAD
1✔
48
MAX_DATETIME = datetime(2286, 11, 20, 17, 46, 39)
1✔
49
DEFAULT_WAIT_TIME = timedelta(minutes=10)
1✔
50
THREAD_TIMEOUT = "Thread {name} timeout"
1✔
51
COVERAGE_COLLECTOR = None  # Detect Coverage.py
1✔
52

53
datetime.strptime("2012-01-01", "%Y-%m-%d")  # http://bugs.python.org/issue7980
1✔
54

55
cprofiler_stats = None  # ACCUMULATION OF STATS FROM ALL THREADS
1✔
56

57
try:
1✔
58
    STDOUT = sys.stdout.buffer
1✔
59
    STDERR = sys.stderr.buffer
1✔
60
    STDIN = sys.stdin.buffer
1✔
61
except Exception:
×
62
    STDOUT = sys.stdout
×
63
    STDERR = sys.stderr
×
64
    STDIN = sys.stdin
×
65

66
if sys.version_info[1] > 9:
1✔
67

68
    def is_daemon(thread):
1✔
69
        return thread.daemon
1✔
70

71

72
else:
73

74
    def is_daemon(thread):
×
75
        return thread.isDaemon()
×
76

77
IN_DEBUGGER = any(debugger in line.filename for line in traceback.extract_stack() for debugger in KNOWN_DEBUGGERS)
1✔
78

79

80
class BaseThread(object):
1✔
81
    __slots__ = [
1✔
82
        "_ident",
83
        "threading_thread",
84
        "child_locker",
85
        "children",
86
        "parent",
87
        "cprofiler",
88
    ]
89

90
    def __init__(self, ident, threading_thread, name=None):
1✔
91
        if threading_thread.name.startswith("Dummy"):
1✔
92
            threading_thread.name = name or f"Unknown Thread {ident}"
×
93
        self._ident = ident
1✔
94
        self.threading_thread = threading_thread
1✔
95
        self.child_locker = allocate_lock()
1✔
96
        self.children = []
1✔
97
        self.parent = MAIN_THREAD
1✔
98
        self.cprofiler = None
1✔
99

100
    @property
1✔
101
    def ident(self):
1✔
102
        return self._ident
1✔
103

104
    @property
1✔
105
    def id(self):
1✔
106
        return self._ident
×
107

108
    @property
1✔
109
    def name(self):
1✔
110
        return self.threading_thread.name
1✔
111

112
    def add_child(self, child):
1✔
113
        if DEBUG:
114
            if child.name == TIMERS_NAME:
115
                logger.error("timer thread should not be added as child")
116
            if child is self:
117
                logger.error("parent thread should not be added as child")
118
            logger.info("adding child {child} to {parent}", child=child.name, parent=self.name)
119

120
        with self.child_locker:
1✔
121
            self.children.append(child)
1✔
122

123
    def remove_child(self, child):
1✔
124
        try:
1✔
125
            with self.child_locker:
1✔
126
                self.children.remove(child)
1✔
127
        except Exception:
1✔
128
            pass
1✔
129

130
    def stop(self):
1✔
131
        pass
1✔
132

133
    def join(self, till=None):
1✔
134
        DEBUG and logger.info("{thread} joining on thread {name|quote}", name=self.name)
135
        thread = self.threading_thread
1✔
136
        try:
1✔
137
            if not is_daemon(thread):
1✔
138
                while not till and thread.is_alive():
1✔
139
                    thread.join(1.0)
×
140
                    sys.stderr.write(f"waiting for {thread.name}")
×
141
        except Exception as cause:
142
            logger.error(thread.name, cause=cause)
143
        finally:
144
            try:
1✔
145
                self.parent.remove_child(self)
1✔
146
            except Exception as cause:
147
                logger.warning("parents of children must have remove_child() method", cause=cause)
148

149
    def __repr__(self):
1✔
150
        return f"BaseThread({self.name})"
×
151

152

153
class AlienThread(BaseThread):
1✔
154
    def __init__(self, ident, threading_thread):
1✔
155
        BaseThread.__init__(self, ident, threading_thread)
1✔
156

157
        if DEBUG:
158
            with ALL_LOCK:
159
                ALL[ident] = self
160
                ALL_THREAD.append(self)
161
                problem = ident in ALL
162
            logger.info("adding {name} ({id}) to ALL", id=self.ident, name=self.name)
163
            if problem:
164
                logger.error("Thread {name|quote} ({id}) already registered", id=self.ident, name=self.name)
165
        else:
166
            with ALL_LOCK:
1✔
167
                ALL[ident] = self
1✔
168

169
    def join(self, till=None):
1✔
170
        try:
1✔
171
            BaseThread.join(self, till=till)
1✔
172
        finally:
173
            if DEBUG:
174
                logger.info("removing {name} ({id}) from ALL", id=self._ident, name=self.name)
175
            with ALL_LOCK:
1✔
176
                if self._ident in ALL:
1✔
177
                    del ALL[self._ident]
1✔
178

179

180
class MainThread(BaseThread):
1✔
181
    def __init__(self):
1✔
182
        BaseThread.__init__(self, get_ident(), threading.current_thread(), "Main Thread")
1✔
183
        self.please_stop = Signal()
1✔
184
        self.stopped = Signal()
1✔
185
        self.stop_logging = logger.stop
1✔
186
        self.timers = None
1✔
187
        self.shutdown_locker = allocate_lock()
1✔
188

189
    def stop(self):
1✔
190
        """
191
        BLOCKS UNTIL ALL KNOWN THREADS, EXCEPT MainThread, HAVE STOPPED
192
        """
193
        self_thread = current_thread()
1✔
194
        if self_thread != MAIN_THREAD:
1✔
195
            logger.error("Only the main thread can call stop()")
×
196
        if self_thread != self:
1✔
197
            logger.error("Only the current thread can call stop()")
×
198

199
        DEBUG and logger.info("Stopping MainThread")
200
        if self.stopped:
1✔
201
            return
×
202

203
        self.please_stop.go()
1✔
204

205
        with self.child_locker:
1✔
206
            children = list(self.children)
1✔
207
        for c in reversed(children):
1✔
208
            DEBUG and c.name and logger.info("Stopping thread {name|quote}", name=c.name)
209
            c.stop()
1✔
210

211
        join_errors = None
1✔
212
        try:
1✔
213
            join_all_threads(children)
1✔
214
        except Exception as cause:
215
            join_errors = Except.wrap(cause)
216
            # REPORT ERRORS BEFORE LOGGING SHUTDOWN
217
            logger.warning("Problem while stopping {name|quote}", name=self.name, cause=cause, log_context=ERROR)
218
        DEBUG and logger.info("All children stopped")
219

220
        with self.shutdown_locker:
1✔
221
            if self.stopped:
1✔
222
                return
×
223
            self.stop_logging()
1✔
224
            self.timers.stop().join()
1✔
225

226
            if cprofiler_stats is not None:
1✔
227
                from mo_threads.profiles import write_profiles
×
228

229
                write_profiles(self.cprofiler)
×
230
            DEBUG and logger.info("Thread {name|quote} now stopped", name=self.name)
231

232
        if DEBUG:
233
            logger.info("MainThread removing {name} ({id}) from ALL", id=self.ident, name=self.name)
234
        with ALL_LOCK:
1✔
235
            if self.ident in ALL:
1✔
236
                del ALL[self.ident]
1✔
237
            residue = list(ALL.values())
1✔
238

239
        DEBUG and logger.info("MainThread has {count} threads left", count=[t.name for t in threading.enumerate()])
240

241
        if residue and (DEBUG or not IN_DEBUGGER):
1✔
242
            sys.stderr.write("Expecting no further threads: " + ", ".join(f"{t.name} ({t.ident})" for t in residue))
×
243
        for t in residue:
1✔
244
            t.stop()
×
245
        join_all_threads(residue)
1✔
246

247
        if join_errors:
1✔
248
            raise Except(
1✔
249
                context=ERROR,
250
                template="Problem while stopping {name|quote}",
251
                name=self.name,
252
                cause=unwraplist(join_errors),
253
            )
254

255
        return self
1✔
256

257

258
EndOfThread = namedtuple("EndOfThread", ["response", "exception"])
1✔
259

260

261
class Thread(BaseThread):
1✔
262
    """
263
    join() ENHANCED TO ALLOW CAPTURE OF CTRL-C, AND RETURN POSSIBLE THREAD EXCEPTIONS
264
    run() ENHANCED TO CAPTURE EXCEPTIONS
265
    """
266

267
    def __init__(self, name, target, *args, parent_thread=None, daemon=False, **kwargs):
1✔
268
        name = str(name)
1✔
269
        threading_thread = threading.Thread(None, self._run, name, daemon=daemon)
1✔
270
        BaseThread.__init__(self, 0, threading_thread, name or f"thread_{object.__hash__(self)}")
1✔
271
        self.target = target
1✔
272
        self.end_of_thread = None
1✔
273
        self.args = args
1✔
274

275
        # ENSURE THERE IS A SHARED please_stop SIGNAL
276
        self.kwargs = dict(kwargs)
1✔
277
        if PLEASE_STOP in self.kwargs:
1✔
278
            self.please_stop = self.kwargs[PLEASE_STOP]
1✔
279
        else:
280
            self.please_stop = self.kwargs[PLEASE_STOP] = Signal(f"please_stop for {self.name}")
1✔
281
        self.joiner_is_waiting = Signal(f"joining with {self.name}")
1✔
282
        self.stopped = Signal(f"stopped signal for {self.name}")
1✔
283
        if parent_thread is None:
1✔
284
            parent_thread = current_thread()
1✔
285
        self.parent = parent_thread
1✔
286
        self.parent.add_child(self)
1✔
287

288
        try:
1✔
289
            # INHERIT LOGGING EXTRAS
290
            if exists(parent_thread):
1✔
291
                setattr(
1✔
292
                    self.threading_thread,
293
                    MO_LOGS_EXTRAS,
294
                    [getattr(parent_thread.threading_thread, MO_LOGS_EXTRAS)[-1]]
295
                )
296
        except:
1✔
297
            pass
1✔
298

299
    def __enter__(self):
1✔
300
        return self
×
301

302
    def __exit__(self, type, value, traceback):
1✔
303
        if isinstance(type, BaseException):
×
304
            self.please_stop.go()
×
305

306
        # TODO: AFTER A WHILE START KILLING THREAD
307
        self.join()
×
308
        self.args = None
×
309
        self.kwargs = None
×
310

311
    def start(self):
1✔
312
        try:
1✔
313
            self.threading_thread.start()
1✔
314
            return self
1✔
315
        except Exception as cause:
316
            logger.error("Can not start thread", cause)
317

318
    def stop(self):
1✔
319
        """
320
        SEND STOP SIGNAL, DO NOT BLOCK
321
        """
322
        with self.child_locker:
1✔
323
            children = list(self.children)
1✔
324
        for c in children:
1✔
325
            DEBUG and c.name and logger.note("Stopping thread {name|quote}", name=c.name)
326
            c.stop()
1✔
327
        self.please_stop.go()
1✔
328

329
        DEBUG and logger.note(
330
            "Thread {name|quote} got request to stop from {thread}", thread=current_thread().name, name=self.name
331
        )
332
        return self
1✔
333

334
    def is_alive(self):
1✔
335
        return not self.stopped
×
336

337
    def _run(self):
1✔
338
        self._ident = ident = get_ident()
1✔
339
        with RegisterThread(thread=self):
1✔
340
            with ALL_LOCK:
×
341
                ALL[ident] = self
×
342
            try:  # deal with join
×
343
                try:  # deal with target exceptions
×
344
                    if self.target is not None:
×
345
                        args, kwargs, self.args, self.kwargs = self.args, self.kwargs, None, None
×
346
                        self.end_of_thread = EndOfThread(self.target(*args, **kwargs), None)
×
347
                except Exception as cause:
348
                    cause = Except.wrap(cause)
349
                    self.end_of_thread = EndOfThread(None, cause)
350
                    with self.parent.child_locker:
351
                        emit_problem = self not in self.parent.children
352
                    if emit_problem:
353
                        # THREAD FAILURES ARE A PROBLEM ONLY IF NO ONE WILL BE JOINING WITH IT
354
                        try:
355
                            logger.error("Problem in thread {name|quote}", name=self.name, cause=cause)
356
                        except Exception as cause:
357
                            sys.stderr.write(f"ERROR in thread: {self.name}  {cause}\n")
358
                finally:
359
                    with self.child_locker:
×
360
                        children, self.children = list(self.children), []
×
361
                    try:
×
362
                        for c in children:
×
363
                            DEBUG and logger.note(f"Stopping thread {c.name}\n")
364
                            c.stop()
×
365

366
                        join_all_threads(children)
×
367
                        del self.target, self.args, self.kwargs
×
368
                        DEBUG and logger.note("thread {name|quote} stopping", name=self.name)
369
                    except Exception as cause:
370
                        DEBUG and logger.warning("problem with thread {name|quote}", cause=cause, name=self.name)
371
                    finally:
372
                        DEBUG and logger.note("thread {name|quote} remove from ALL", name=self.name)
373
                        with ALL_LOCK:
×
374
                            if ident in ALL:
×
375
                                del ALL[ident]
×
376
                        # FROM NOW ON, USING logger WILL RE-REGISTER THIS THREAD
377
                        self.stopped.go()
×
378
            finally:
379
                DEBUG and safe_info("thread {name|quote} is done, wait for join", name=self.name)
380
                # WHERE DO WE PUT THE THREAD RESULT?
381
                # IF NO THREAD JOINS WITH THIS, THEN WHAT DO WE DO WITH THE RESULT?
382
                # HOW LONG DO WE WAIT FOR ANOTHER TO ACCEPT THE RESULT?
383
                #
384
                # WAIT 60seconds, THEN SEND RESULT TO LOGGER
385
                (Till(seconds=60) | self.joiner_is_waiting).wait()
×
386
                if self.joiner_is_waiting:
×
387
                    return
×
388
                res, exp = self.end_of_thread
×
389
                if exp:
×
390
                    # THREAD FAILURES ARE A PROBLEM ONLY IF NO ONE WILL BE JOINING WITH IT
391
                    try:
×
392
                        logger.error(
×
393
                            "Problem in thread {name|quote}", name=self.name, cause=self.end_of_thread.exception,
394
                        )
395
                    except Exception as cause:
396
                        sys.stderr.write(f"ERROR in thread: {self.name} {cause}\n")
397
                        return
398

399
                if res is not None:
×
400
                    logger.warning(
×
401
                        "Thread {thread} returned a response {response|json}, but was not joined",
402
                        thread=self.name,
403
                        response=res,
404
                    )
405
                # THREAD ENDS OK; FORGET ABOUT IT
406
                if isinstance(self.parent, Thread):
×
407
                    # SOMETIMES parent IS NOT A THREAD
408
                    self.parent.remove_child(self)
×
409

410
    def release(self):
1✔
411
        """
412
        RELEASE THREAD TO FEND FOR ITSELF. THREAD CAN EXPECT TO NEVER
413
        JOIN. WILL SEND RESULTS TO LOGS WHEN DONE.
414

415
        PARENT THREAD WILL STILL ENSURE self HAS STOPPED PROPERLY
416
        """
417
        self.joiner_is_waiting.go()
1✔
418
        return self
1✔
419

420
    def join(self, till=None):
1✔
421
        """
422
        RETURN THE RESULT {"response":r, "exception":e} OF THE THREAD EXECUTION (INCLUDING EXCEPTION, IF EXISTS)
423
        """
424
        if self is Thread:
1✔
425
            logger.error("Thread.join() is not a valid call, use t.join()")
×
426

427
        if isinstance(till, (int, float)):
1✔
428
            till = Till(seconds=till)
×
429

430
        with self.child_locker:
1✔
431
            children = list(self.children)
1✔
432
        join_all_threads(children, till=till)
1✔
433

434
        DEBUG and logger.note(
435
            "{parent.name} ({parent.ident}) waiting on thread {child}", parent=current_thread(), child=self.name,
436
        )
437
        self.joiner_is_waiting.go()
1✔
438
        self.stopped.wait(till=till)
1✔
439
        if not self.stopped:
1✔
440
            raise Except(ERROR, template=THREAD_TIMEOUT, params={"name": self.name})
1✔
441
        DEBUG and logger.note(
442
            "{parent.name} ({parent.ident}) DONE waiting on thread {child}", parent=current_thread(), child=self.name,
443
        )
444

445
        try:
1✔
446
            self.parent.remove_child(self)
1✔
447
        except Exception as cause:
448
            logger.warning("parents of children must have remove_child() method", cause=cause)
449

450
        if self.end_of_thread.exception:
1✔
451
            logger.error(
1✔
452
                "Thread {name|quote} did not end well", name=self.name, cause=self.end_of_thread.exception,
453
            )
454
        return self.end_of_thread.response
1✔
455

456
    @staticmethod
1✔
457
    def run(name, target, *args, **kwargs):
1✔
458
        # ENSURE target HAS please_stop ARGUMENT
459
        if get_function_name(target) == "wrapper":
1✔
460
            pass  # GIVE THE override DECORATOR A PASS
×
461
        elif PLEASE_STOP not in target.__code__.co_varnames:
1✔
462
            logger.error("function must have please_stop argument for signalling shutdown")
×
463

464
        return Thread(name, target, *args, **kwargs).start()
1✔
465

466
    @staticmethod
1✔
467
    def current():
1✔
468
        return current_thread()
1✔
469

470
    @staticmethod
1✔
471
    def join_all(threads):
1✔
472
        join_all_threads(threads)
×
473

474
    ####################################################################################################################
475
    ## threading.Thread METHODS
476
    ####################################################################################################################
477
    def is_alive(self):
1✔
478
        return not self.stopped
×
479

480
    @property
1✔
481
    def _is_stopped(self):
1✔
482
        return self.stopped
×
483

484
    @property
1✔
485
    def daemon(self):
1✔
486
        return False
×
487

488
    def isDaemon(self):
1✔
489
        return False
×
490

491
    def getName(self):
1✔
492
        return self.name
×
493

494
    def setName(self, name):
1✔
495
        self.name = name
×
496

497
    def __repr__(self):
1✔
498
        return f"Thread({self.name})"
×
499

500

501
class RegisterThread(object):
1✔
502
    """
503
    A context manager to handle threads spawned by other libs
504
    This will ensure the thread has unregistered, or
505
    has completed before MAIN_THREAD is shutdown
506
    """
507

508
    __slots__ = ["thread"]
1✔
509

510
    def __init__(self, *, thread=None, name=None):
1✔
511
        self.thread = thread or BaseThread(get_ident(), threading.current_thread(), name)
1✔
512

513
    def __enter__(self):
1✔
514
        thread = self.thread
1✔
515
        if cprofiler_stats is not None:
1✔
516
            from mo_threads.profiles import CProfiler
×
517

518
            cprofiler = thread.cprofiler = CProfiler()
×
519
            cprofiler.__enter__()
×
520
        if COVERAGE_COLLECTOR is not None:
1✔
521
            # STARTING TRACER WILL sys.settrace() ITSELF
522
            COVERAGE_COLLECTOR._collectors[-1]._start_tracer()
1✔
523

524
        return self
×
525

526
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
527
        # PYTHON WILL REMOVE GLOBAL VAR BEFORE END-OF-THREAD
528
        thread = self.thread
1✔
529

530
        if cprofiler_stats is not None:
1✔
531
            thread.cprofiler.__exit__(exc_type, exc_val, exc_tb)
×
532
        with thread.child_locker:
1✔
533
            if thread.children:
1✔
534
                logger.error(
×
535
                    "Thread {thread|quote} has not joined with child threads {children|json}",
536
                    children=[c.name for c in thread.children],
537
                    thread=thread.name,
538
                )
539
        DEBUG and safe_info("deregistered {name}", name=thread.name)
540

541

542
def register_thread(func):
1✔
543
    """
544
    Call `with RegisterThread():`
545
    Track this thread to ensure controlled shutdown
546
    """
547

548
    @decorate(func)
×
549
    def output(*args, **kwargs):
×
550
        with RegisterThread():
×
551
            return func(*args, **kwargs)
×
552

553
    return output
×
554

555

556
def _poll_for_exit(please_stop):
1✔
557
    """
558
    /dev/null PIPED TO sys.stdin SPEWS INFINITE LINES, DO NOT POLL AS OFTEN
559
    """
560
    cr_count = 0  # COUNT NUMBER OF BLANK LINES
×
561

562
    try:
×
563
        while not please_stop:
×
564
            # DEBUG and logger.note("inside wait-for-shutdown loop")
565
            if cr_count > 30:
×
566
                (Till(seconds=3) | please_stop).wait()
×
567
            try:
×
568
                line = STDIN.readline()
×
569
            except Exception as cause:
570
                Except.wrap(cause)
571
                if "Bad file descriptor" in cause:
572
                    logger.note("can not read from stdin")
573
                    _wait_for_interrupt(please_stop)
574
                    break
575

576
            # DEBUG and logger.note("read line {line|quote}, count={count}", line=line, count=cr_count)
577
            if not line:
×
578
                cr_count += 1
×
579
            else:
580
                cr_count = -1000000  # NOT /dev/null
×
581

582
            if line.strip() == b"exit":
×
583
                logger.alert("'exit' Detected!  Sending request to stop")
×
584
                please_stop.go()
×
585
                return
×
586
    except Exception as cause:
587
        logger.warning("programming error", cause=cause)
588
    finally:
589
        if please_stop:
×
590
            logger.note("please_stop has been requested")
×
591
        logger.note("done waiting for exit")
×
592

593

594
def current_thread():
1✔
595
    ident = get_ident()
1✔
596
    all_lock = ALL_LOCK
1✔
597
    all = ALL
1✔
598
    main_thread = MAIN_THREAD
1✔
599

600
    with all_lock:
1✔
601
        output = all.get(ident)
1✔
602

603
    if output is None:
1✔
604
        threading_thread = threading.current_thread()
1✔
605
        if threading_thread is main_thread.threading_thread:
1✔
606
            return main_thread
×
607

608
        thread = AlienThread(ident, threading_thread)
1✔
609
        if cprofiler_stats is not None:
1✔
610
            from mo_threads.profiles import CProfiler
×
611

612
            thread.cprofiler = CProfiler()
×
613
            thread.cprofiler.__enter__()
×
614
        main_thread.add_child(thread)
1✔
615
        with all_lock:
1✔
616
            all[ident] = thread
1✔
617

618
        logger.warning(
1✔
619
            "this thread is not known. Register {name|quote} at earliest known entry point.", name=thread.name
620
        )
621
        return thread
1✔
622
    return output
1✔
623

624

625
def safe_info(template, **params):
1✔
626
    print(expand_template(template, params))
×
627

628

629
def join_all_threads(threads, till=None):
1✔
630
    """
631
    Join all threads, and raise any exceptions
632
    :param threads: list of threads to join
633
    :param till: signal to stop waiting for threads
634
    """
635
    threads = list(threads)
1✔
636
    result = [None] * len(threads)
1✔
637
    causes = []
1✔
638
    for i, c in enumerate(threads):
1✔
639
        try:
1✔
640
            DEBUG and logger.note("{parent} joining on thread {name}", parent=current_thread().name, name=c.name)
641
            result[i] = c.join(till=till)
1✔
642
        except Exception as cause:
643
            causes.append(cause)
644
        finally:
645
            DEBUG and logger.note("Joined on thread {name}", name=c.name)
646
    if causes:
1✔
647
        logger.error("At least one thread failed", cause=causes)
1✔
648
    return result
1✔
649

650

651
export("mo_threads.signals", current_thread)
1✔
652

653

654
def _wait_for_interrupt(please_stop):
1✔
655
    DEBUG and logger.note("wait for stop signal")
656
    try:
1✔
657
        # ALTERNATE BETWEEN please_stop CHECK AND SIGINT CHECK
658
        while not please_stop:
1✔
659
            sleep(1)  # LOCKS CAN NOT BE INTERRUPTED, ONLY sleep() CAN
×
660
    finally:
661
        please_stop.go()
1✔
662

663

664
def wait_for_shutdown_signal(
1✔
665
        *,
666
        please_stop=False,  # ASSIGN SIGNAL TO STOP EARLY
667
        allow_exit=False,  # ALLOW "exit" COMMAND ON CONSOLE TO ALSO STOP THE APP
668
        wait_forever=True,  # IGNORE CHILD THREADS, NEVER EXIT.  False => IF NO CHILD THREADS LEFT, THEN EXIT
669
):
670
    """
671
    FOR USE BY PROCESSES THAT NEVER DIE UNLESS EXTERNAL SHUTDOWN IS REQUESTED
672

673
    CALLING THREAD WILL SLEEP UNTIL keyboard interrupt, OR please_stop, OR "exit"
674
    """
675
    main = current_thread()
1✔
676
    if main != MAIN_THREAD:
1✔
677
        logger.error("Only the main thread can sleep forever (waiting for KeyboardInterrupt)")
×
678

679
    if isinstance(please_stop, Signal):
1✔
680
        # MUTUAL SIGNALING MAKES THESE TWO EFFECTIVELY THE SAME SIGNAL
681
        main.please_stop.then(please_stop.go, raise_from_none)
×
682
        please_stop.then(main.please_stop.go)
×
683

684
    if not wait_forever:
1✔
685
        # TRIGGER SIGNAL WHEN ALL CHILDREN THREADS ARE DONE
686
        with main.child_locker:
1✔
687
            pending = list(main.children)
1✔
688
        DEBUG and logger.note("waiting for {children} child threads to finish", children=[c.name for c in pending])
689
        children_done = AndSignals(main.please_stop, len(pending))
1✔
690
        children_done.signal.then(main.please_stop.go)
1✔
691
        for p in pending:
1✔
692
            p.stopped.then(children_done.done)
1✔
693

694
    try:
1✔
695
        if allow_exit:
1✔
696
            threading.Thread(None, _poll_for_exit, args=[main.please_stop], daemon=True).start()
×
697
        _wait_for_interrupt(main.please_stop)
1✔
698
        logger.alert("Stop requested!  Stopping...")
1✔
699
    except KeyboardInterrupt as _:
×
700
        logger.alert("SIGINT Detected!  Stopping...")
×
701
    except SystemExit as _:
×
702
        logger.alert("SIGTERM Detected!  Stopping...")
×
703
    finally:
704
        main.stop()
1✔
705

706

707
def stop_main_thread(signum=0, frame=None, silent=False):
1✔
708
    if not ALL:
1✔
709
        silent or logger.note("All threads have shutdown")
1✔
710
        return
1✔
711

712
    if current_thread() == MAIN_THREAD:
1✔
713
        MAIN_THREAD.stop()
1✔
714
    else:
715
        MAIN_THREAD.please_stop.go()
×
716

717

718
def start_main_thread():
1✔
719
    global MAIN_THREAD
720

721
    try:
1✔
722
        stop_main_thread(silent=True)
1✔
723
    except Exception:
1✔
724
        pass
1✔
725
    MAIN_THREAD = MainThread()
1✔
726
    MAIN_THREAD.shutdown_locker.acquire()
1✔
727

728
    with ALL_LOCK:
1✔
729
        if ALL:
1✔
730
            names = [f"{t.name} ({k})" for k, t in ALL.items()]
×
731
        else:
732
            names = []
1✔
733
            ALL[get_ident()] = MAIN_THREAD
1✔
734

735
    if names:
1✔
736
        if DEBUG:
737
            from mo_files import File
738
            from mo_json import value2json
739

740
            File("all_thread.json").write(value2json([(t.name, t.id) for t in ALL_THREAD], pretty=True))
741
        raise Exception(f"expecting no threads {names}")
×
742

743
    # STARTUP TIMERS
744
    from mo_threads import till
1✔
745

746
    till.enabled = Signal()
1✔
747
    MAIN_THREAD.timers = Thread.run(TIMERS_NAME, till.daemon, parent_thread=Null)
1✔
748
    till.enabled.wait()
1✔
749
    MAIN_THREAD.shutdown_locker.release()
1✔
750

751

752
_signal.signal(_signal.SIGTERM, stop_main_thread)
1✔
753
_signal.signal(_signal.SIGINT, stop_main_thread)
1✔
754
if sys.version_info[:2] < (3, 9):
1✔
755
    def wait_for_join():
×
756
        global current_thread
757

758
        threading.main_thread().join()
×
759
        # after main thread exits, we must stop MAIN_THREAD
760
        # spoof the current_thread() to be MAIN_THREAD
761
        current_thread = lambda: MAIN_THREAD
×
762
        stop_main_thread()
×
763

764

765
    threading.Thread(None, wait_for_join, args=[], daemon=False).start()
×
766
else:
767
    threading._register_atexit(stop_main_thread)
1✔
768

769
MAIN_THREAD = None
1✔
770
ALL_LOCK = allocate_lock()
1✔
771
ALL = dict()
1✔
772
ALL_THREAD = []  # FOR DEBUGGING ONLY
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc