• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26751910360

01 Jun 2026 11:24AM UTC coverage: 52.046% (-40.7%) from 92.792%
26751910360

Pull #23395

github

web-flow
Merge 41608d741 into c8127c1f4
Pull Request #23395: Bump the gha-deps group with 8 updates

31994 of 61473 relevant lines covered (52.05%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/observability/opentelemetry/single_threaded_processor.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
×
5

6
import datetime
×
7
import logging
×
8
import queue
×
9
from dataclasses import dataclass
×
10
from enum import Enum
×
11
from threading import Event, Thread
×
12

13
from pants.backend.observability.opentelemetry.processor import (
×
14
    IncompleteWorkunit,
15
    Processor,
16
    ProcessorContext,
17
    Workunit,
18
)
19

20
logger = logging.getLogger(__name__)
×
21

22

23
class _MessageType(Enum):
×
24
    START_WORKUNIT = "start_workunit"
×
25
    COMPLETE_WORKUNIT = "complete_workunit"
×
26
    FINISH = "finish"
×
27

28

29
@dataclass
×
30
class _FinishDetails:
×
31
    timeout: datetime.timedelta | None
×
32
    context: ProcessorContext
×
33

34

35
class SingleThreadedProcessor(Processor):
×
36
    """This is a `Processor` implementation which pushes all received workunits
37
    onto a queue for processing on a separate thread.
38

39
    This is useful for moving workunit operations off the engine's
40
    thread. Also, it allows working around any concurrency issues in an
41
    underlying `Processor` implementation since all operations will
42
    occur on a single, separate thread.
43
    """
44

45
    def __init__(self, processor: Processor) -> None:
×
46
        self._processor = processor
×
47

48
        self._initialize_completed_event = Event()
×
49
        self._finish_completed_event = Event()
×
50

51
        self._queue: queue.Queue[
×
52
            tuple[
53
                _MessageType,
54
                Workunit | IncompleteWorkunit | _FinishDetails,
55
                ProcessorContext,
56
            ]
57
        ] = queue.Queue()
58

59
        self._thread = Thread(target=self._processing_loop)
×
60
        self._thread.daemon = True
×
61

62
    def _handle_message(
×
63
        self,
64
        msg: tuple[_MessageType, Workunit | IncompleteWorkunit | _FinishDetails, ProcessorContext],
65
    ) -> _FinishDetails | None:
66
        """Processes messages.
67

68
        Returns a `_FinishDetails` to use for shutdown if the finish
69
        message was received, else None.
70
        """
71
        msg_type: _MessageType = msg[0]
×
72
        if msg_type == _MessageType.START_WORKUNIT:
×
73
            incomplete_workunit = msg[1]
×
74
            assert isinstance(incomplete_workunit, IncompleteWorkunit)
×
75
            self._processor.start_workunit(workunit=incomplete_workunit, context=msg[2])
×
76
            return None
×
77
        elif msg_type == _MessageType.COMPLETE_WORKUNIT:
×
78
            workunit = msg[1]
×
79
            assert isinstance(workunit, Workunit)
×
80
            self._processor.complete_workunit(workunit=workunit, context=msg[2])
×
81
            return None
×
82
        elif msg_type == _MessageType.FINISH:
×
83
            # Finish signalled. Let caller know what context to use for it.
84
            finish_details = msg[1]
×
85
            assert isinstance(finish_details, _FinishDetails)
×
86
            return finish_details
×
87
        else:
88
            raise AssertionError("Received unknown message type in SingleThreadedProcessor.")
×
89

90
    def _processing_loop(self) -> None:
×
91
        self._processor.initialize()
×
92
        self._initialize_completed_event.set()
×
93

94
        finish_details: _FinishDetails | None
95
        while msg := self._queue.get():
×
96
            finish_details = self._handle_message(msg)
×
97
            if finish_details is not None:
×
98
                break
×
99

100
        if self._queue.qsize() > 0:
×
101
            logger.warning(
×
102
                "Completion of workunit export was signalled before all workunits in flight were processed!"
103
            )
104

105
        self._processor.finish(timeout=finish_details.timeout, context=finish_details.context)
×
106
        self._finish_completed_event.set()
×
107

108
    def initialize(self) -> None:
×
109
        self._thread.start()
×
110
        if not self._initialize_completed_event.wait(5.0):
×
111
            raise RuntimeError("Work unit processor failed to report initialization.")
×
112

113
    def start_workunit(self, workunit: IncompleteWorkunit, *, context: ProcessorContext) -> None:
×
114
        self._queue.put_nowait((_MessageType.START_WORKUNIT, workunit, context))
×
115

116
    def complete_workunit(self, workunit: Workunit, *, context: ProcessorContext) -> None:
×
117
        self._queue.put_nowait((_MessageType.COMPLETE_WORKUNIT, workunit, context))
×
118

119
    def finish(
×
120
        self, timeout: datetime.timedelta | None = None, *, context: ProcessorContext
121
    ) -> None:
122
        self._queue.put_nowait(
×
123
            (_MessageType.FINISH, _FinishDetails(timeout=timeout, context=context), context)
124
        )
125
        timeout_seconds_opt = timeout.total_seconds() if timeout is not None else None
×
126
        if not self._finish_completed_event.wait(timeout_seconds_opt):
×
127
            raise RuntimeError("Work unit processor failed to report completion.")
×
128
        self._thread.join(timeout=timeout_seconds_opt)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc