• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12592526412

03 Jan 2025 03:58AM UTC coverage: 94.047% (+13.5%) from 80.557%
12592526412

push

github

8422c6
kevdevg
feat: adding async tests

1406 of 1495 relevant lines covered (94.05%)

1.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.69
/scope3ai/worker.py
1
import logging
2✔
2
import queue
2✔
3
import threading
2✔
4
from time import sleep
2✔
5
from typing import Callable, Optional
2✔
6

7
from time import monotonic
2✔
8

9
logger = logging.getLogger("scope3ai.worker")
2✔
10

11

12
class BackgroundWorker:
2✔
13
    STOP_WORKER = object()
2✔
14

15
    def __init__(self, size: int) -> None:
2✔
16
        self._size = size
2✔
17
        self._queue = queue.Queue(maxsize=size)
2✔
18
        self._lock = threading.Lock()
2✔
19
        self._thread: Optional[threading.Thread] = None
2✔
20
        self._pause_event = threading.Event()
2✔
21
        self._pause_event.set()
2✔
22

23
    @property
2✔
24
    def is_alive(self) -> bool:
2✔
25
        return self._thread and self._thread.is_alive()
2✔
26

27
    def _ensure_thread(self) -> None:
2✔
28
        if not self.is_alive:
2✔
29
            self.start()
2✔
30

31
    def submit(self, callback: Callable[[], None]) -> bool:
2✔
32
        self._ensure_thread()
2✔
33
        try:
2✔
34
            self._queue.put_nowait(callback)
2✔
35
            return True
2✔
36
        except queue.Full:
2✔
37
            return False
2✔
38

39
    def start(self) -> None:
2✔
40
        with self._lock:
2✔
41
            if self.is_alive:
2✔
42
                return
×
43
            logger.debug("Starting background worker")
2✔
44
            self._thread = threading.Thread(
2✔
45
                target=self._run,
46
                name="scope3ai.BackgroundWorker",
47
                daemon=True,
48
            )
49
            try:
2✔
50
                self._thread.start()
2✔
51
            except RuntimeError:
×
52
                self._thread = None
×
53

54
    def kill(self) -> None:
2✔
55
        logger.debug("Got kill signal")
2✔
56
        with self._lock:
2✔
57
            if not self._thread:
2✔
58
                return
2✔
59
            try:
2✔
60
                self._queue.put_nowait(self.STOP_WORKER)
2✔
61
            except queue.Full:
×
62
                logger.debug("Failed to kill worker")
×
63
            except queue.ShutDown:
×
64
                logger.debug("Worker already shutdown")
×
65
            self._thread = None
2✔
66
            self._queue = queue.Queue(maxsize=self._size)
2✔
67

68
    def flush(self, timeout: float = 5) -> None:
2✔
69
        logger.debug("Got flush signal")
2✔
70
        with self._lock:
2✔
71
            if not self.is_alive:
2✔
72
                return
×
73
            self._wait_flush(timeout)
2✔
74
        logger.debug("Worker flushed")
2✔
75

76
    def _wait_flush(self, timeout: float) -> None:
2✔
77
        initial_timeout = min(0.1, timeout)
2✔
78
        if not self._timed_queue_join(initial_timeout):
2✔
79
            pending = self._queue.qsize() + 1
2✔
80
            logger.debug(f"{pending} event(s) pending on flush")
2✔
81

82
            if not self._timed_queue_join(timeout - initial_timeout):
2✔
83
                pending = self._queue.qsize() + 1
×
84
                logger.error(f"flush timed out, dropped {pending} events")
×
85

86
    def _timed_queue_join(self, timeout: float) -> bool:
2✔
87
        deadline = monotonic() + timeout
2✔
88
        queue = self._queue
2✔
89

90
        queue.all_tasks_done.acquire()
2✔
91

92
        try:
2✔
93
            while queue.unfinished_tasks:
2✔
94
                delay = deadline - monotonic()
2✔
95
                if delay <= 0:
2✔
96
                    return False
2✔
97
                queue.all_tasks_done.wait(timeout=delay)
2✔
98

99
            return True
2✔
100
        finally:
101
            queue.all_tasks_done.release()
2✔
102

103
    def _run(self) -> None:
2✔
104
        q = self._queue
2✔
105
        while True:
2✔
106
            callback = q.get()
2✔
107
            try:
2✔
108
                if callback is self.STOP_WORKER:
2✔
109
                    break
2✔
110
                self._pause_event.wait()
2✔
111
                try:
2✔
112
                    callback()
2✔
113
                except Exception:
2✔
114
                    logger.error("Failed processing job", exc_info=True)
2✔
115
            finally:
116
                q.task_done()
2✔
117
            sleep(0)
2✔
118

119
    def pause(self) -> None:
2✔
120
        self._pause_event.clear()
2✔
121

122
    def resume(self) -> None:
2✔
123
        self._pause_event.set()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc