• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 14097016956

27 Mar 2025 01:41AM UTC coverage: 96.23% (+15.7%) from 80.557%
14097016956

Pull #92

github

5758a3
dearlordylord
feat(api): client-to-provider dry
Pull Request #92: feat: Managed Service Kebabs

53 of 55 new or added lines in 11 files covered. (96.36%)

44 existing lines in 10 files now uncovered.

2578 of 2679 relevant lines covered (96.23%)

3.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.58
/scope3ai/worker.py
1
import logging
4✔
2
import queue
4✔
3
import threading
4✔
4
from time import monotonic, sleep
4✔
5
from typing import Callable, Optional
4✔
6

7
logger = logging.getLogger("scope3ai.worker")
4✔
8

9

10
class BackgroundWorker:
4✔
11
    STOP_WORKER = object()
4✔
12

13
    def __init__(self, size: int) -> None:
4✔
14
        self._size = size
4✔
15
        self._queue = queue.Queue(maxsize=size)
4✔
16
        self._lock = threading.Lock()
4✔
17
        self._thread: Optional[threading.Thread] = None
4✔
18
        self._pause_event = threading.Event()
4✔
19
        self._pause_event.set()
4✔
20

21
    @property
4✔
22
    def is_alive(self) -> bool:
4✔
23
        return self._thread and self._thread.is_alive()
4✔
24

25
    def _ensure_thread(self) -> None:
4✔
26
        if not self.is_alive:
4✔
27
            self.start()
4✔
28

29
    def submit(self, callback: Callable[[], None]) -> bool:
4✔
30
        self._ensure_thread()
4✔
31
        try:
4✔
32
            self._queue.put_nowait(callback)
4✔
33
            return True
4✔
34
        except queue.Full:
4✔
35
            return False
4✔
36

37
    def start(self) -> None:
4✔
38
        with self._lock:
4✔
39
            if self.is_alive:
4✔
UNCOV
40
                return
×
41
            logger.debug("Starting background worker")
4✔
42
            self._thread = threading.Thread(
4✔
43
                target=self._run,
44
                name="scope3ai.BackgroundWorker",
45
                daemon=True,
46
            )
47
            try:
4✔
48
                self._thread.start()
4✔
UNCOV
49
            except RuntimeError:
×
50
                self._thread = None
×
51

52
    def kill(self) -> None:
4✔
53
        logger.debug("Got kill signal")
4✔
54
        with self._lock:
4✔
55
            if not self._thread:
4✔
56
                return
4✔
57
            try:
4✔
58
                self._queue.put_nowait(self.STOP_WORKER)
4✔
UNCOV
59
            except queue.Full:
×
60
                logger.debug("Failed to kill worker")
×
61
            except queue.ShutDown:
×
62
                logger.debug("Worker already shutdown")
×
63
            self._thread = None
4✔
64
            self._queue = queue.Queue(maxsize=self._size)
4✔
65

66
    def flush(self, timeout: float = 5) -> None:
4✔
67
        logger.debug("Got flush signal")
4✔
68
        with self._lock:
4✔
69
            if not self.is_alive:
4✔
70
                return
×
71
            self._wait_flush(timeout)
4✔
72
        logger.debug("Worker flushed")
4✔
73

74
    def _wait_flush(self, timeout: float) -> None:
4✔
75
        initial_timeout = min(0.1, timeout)
4✔
76
        if not self._timed_queue_join(initial_timeout):
4✔
77
            pending = self._queue.qsize() + 1
4✔
78
            logger.debug(f"{pending} event(s) pending on flush")
4✔
79

80
            if not self._timed_queue_join(timeout - initial_timeout):
4✔
81
                pending = self._queue.qsize() + 1
×
82
                logger.error(f"flush timed out, dropped {pending} events")
×
83

84
    def _timed_queue_join(self, timeout: float) -> bool:
4✔
85
        deadline = monotonic() + timeout
4✔
86
        queue = self._queue
4✔
87

88
        queue.all_tasks_done.acquire()
4✔
89

90
        try:
4✔
91
            while queue.unfinished_tasks:
4✔
92
                delay = deadline - monotonic()
4✔
93
                if delay <= 0:
4✔
94
                    return False
4✔
95
                queue.all_tasks_done.wait(timeout=delay)
4✔
96

97
            return True
4✔
98
        finally:
99
            queue.all_tasks_done.release()
4✔
100

101
    def _run(self) -> None:
4✔
102
        q = self._queue
4✔
103
        while True:
3✔
104
            callback = q.get()
4✔
105
            try:
4✔
106
                if callback is self.STOP_WORKER:
4✔
107
                    break
4✔
108
                self._pause_event.wait()
4✔
109
                try:
4✔
110
                    callback()
4✔
111
                except Exception:
4✔
112
                    logger.error("Failed processing job", exc_info=True)
4✔
113
            finally:
114
                q.task_done()
4✔
115
            sleep(0)
4✔
116

117
    def pause(self) -> None:
4✔
118
        self._pause_event.clear()
4✔
119

120
    def resume(self) -> None:
4✔
121
        self._pause_event.set()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc