• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26080722777

19 May 2026 06:37AM UTC coverage: 52.106% (-11.5%) from 63.597%
26080722777

Pull #23250

github

web-flow
Merge 63ec06323 into 2693df832
Pull Request #23250: Feature: Add generic option to docker image

12 of 50 new or added lines in 3 files covered. (24.0%)

5382 existing lines in 201 files now uncovered.

32053 of 61515 relevant lines covered (52.11%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/observability/opentelemetry/single_threaded_processor.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import datetime
×
UNCOV
7
import logging
×
UNCOV
8
import queue
×
UNCOV
9
from dataclasses import dataclass
×
UNCOV
10
from enum import Enum
×
UNCOV
11
from threading import Event, Thread
×
12

UNCOV
13
from pants.backend.observability.opentelemetry.processor import (
×
14
    IncompleteWorkunit,
15
    Processor,
16
    ProcessorContext,
17
    Workunit,
18
)
19

UNCOV
20
logger = logging.getLogger(__name__)
×
21

22

UNCOV
23
class _MessageType(Enum):
×
UNCOV
24
    START_WORKUNIT = "start_workunit"
×
UNCOV
25
    COMPLETE_WORKUNIT = "complete_workunit"
×
UNCOV
26
    FINISH = "finish"
×
27

28

UNCOV
29
@dataclass
×
UNCOV
30
class _FinishDetails:
×
UNCOV
31
    timeout: datetime.timedelta | None
×
UNCOV
32
    context: ProcessorContext
×
33

34

UNCOV
35
class SingleThreadedProcessor(Processor):
×
36
    """This is a `Processor` implementation which pushes all received workunits
37
    onto a queue for processing on a separate thread.
38

39
    This is useful for moving workunit operations off the engine's
40
    thread. Also, it allows working around any concurrency issues in an
41
    underlying `Processor` implementation since all operations will
42
    occur on a single, separate thread.
43
    """
44

UNCOV
45
    def __init__(self, processor: Processor) -> None:
×
UNCOV
46
        self._processor = processor
×
47

UNCOV
48
        self._initialize_completed_event = Event()
×
UNCOV
49
        self._finish_completed_event = Event()
×
50

UNCOV
51
        self._queue: queue.Queue[
×
52
            tuple[
53
                _MessageType,
54
                Workunit | IncompleteWorkunit | _FinishDetails,
55
                ProcessorContext,
56
            ]
57
        ] = queue.Queue()
58

UNCOV
59
        self._thread = Thread(target=self._processing_loop)
×
UNCOV
60
        self._thread.daemon = True
×
61

UNCOV
62
    def _handle_message(
×
63
        self,
64
        msg: tuple[_MessageType, Workunit | IncompleteWorkunit | _FinishDetails, ProcessorContext],
65
    ) -> _FinishDetails | None:
66
        """Processes messages.
67

68
        Returns a `_FinishDetails` to use for shutdown if the finish
69
        message was received, else None.
70
        """
UNCOV
71
        msg_type: _MessageType = msg[0]
×
UNCOV
72
        if msg_type == _MessageType.START_WORKUNIT:
×
UNCOV
73
            incomplete_workunit = msg[1]
×
UNCOV
74
            assert isinstance(incomplete_workunit, IncompleteWorkunit)
×
UNCOV
75
            self._processor.start_workunit(workunit=incomplete_workunit, context=msg[2])
×
UNCOV
76
            return None
×
UNCOV
77
        elif msg_type == _MessageType.COMPLETE_WORKUNIT:
×
UNCOV
78
            workunit = msg[1]
×
UNCOV
79
            assert isinstance(workunit, Workunit)
×
UNCOV
80
            self._processor.complete_workunit(workunit=workunit, context=msg[2])
×
UNCOV
81
            return None
×
UNCOV
82
        elif msg_type == _MessageType.FINISH:
×
83
            # Finish signalled. Let caller know what context to use for it.
UNCOV
84
            finish_details = msg[1]
×
UNCOV
85
            assert isinstance(finish_details, _FinishDetails)
×
UNCOV
86
            return finish_details
×
87
        else:
88
            raise AssertionError("Received unknown message type in SingleThreadedProcessor.")
×
89

UNCOV
90
    def _processing_loop(self) -> None:
×
UNCOV
91
        self._processor.initialize()
×
UNCOV
92
        self._initialize_completed_event.set()
×
93

94
        finish_details: _FinishDetails | None
UNCOV
95
        while msg := self._queue.get():
×
UNCOV
96
            finish_details = self._handle_message(msg)
×
UNCOV
97
            if finish_details is not None:
×
UNCOV
98
                break
×
99

UNCOV
100
        if self._queue.qsize() > 0:
×
101
            logger.warning(
×
102
                "Completion of workunit export was signalled before all workunits in flight were processed!"
103
            )
104

UNCOV
105
        self._processor.finish(timeout=finish_details.timeout, context=finish_details.context)
×
UNCOV
106
        self._finish_completed_event.set()
×
107

UNCOV
108
    def initialize(self) -> None:
×
UNCOV
109
        self._thread.start()
×
UNCOV
110
        if not self._initialize_completed_event.wait(5.0):
×
111
            raise RuntimeError("Work unit processor failed to report initialization.")
×
112

UNCOV
113
    def start_workunit(self, workunit: IncompleteWorkunit, *, context: ProcessorContext) -> None:
×
UNCOV
114
        self._queue.put_nowait((_MessageType.START_WORKUNIT, workunit, context))
×
115

UNCOV
116
    def complete_workunit(self, workunit: Workunit, *, context: ProcessorContext) -> None:
×
UNCOV
117
        self._queue.put_nowait((_MessageType.COMPLETE_WORKUNIT, workunit, context))
×
118

UNCOV
119
    def finish(
×
120
        self, timeout: datetime.timedelta | None = None, *, context: ProcessorContext
121
    ) -> None:
UNCOV
122
        self._queue.put_nowait(
×
123
            (_MessageType.FINISH, _FinishDetails(timeout=timeout, context=context), context)
124
        )
UNCOV
125
        timeout_seconds_opt = timeout.total_seconds() if timeout is not None else None
×
UNCOV
126
        if not self._finish_completed_event.wait(timeout_seconds_opt):
×
127
            raise RuntimeError("Work unit processor failed to report completion.")
×
UNCOV
128
        self._thread.join(timeout=timeout_seconds_opt)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc