• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25565075335

08 May 2026 03:47PM UTC coverage: 92.787% (-0.1%) from 92.887%
25565075335

push

github

web-flow
add OpenTelemetry backend for work unit reporting (#23284)

# Overview

Add a new `pants.backend.observability.opentelemetry` backend to report
work unit tracing to OpenTelemetry. The backend is based on
[shoalsoft-pants-opentelemetry-plugin](https://github.com/shoalsoft/shoalsoft-pants-opentelemetry-plugin)
with unnecessary compatibility code and "shoalsoft" branding removed.

Notes:
- This backend only reports Pants engine work units to OpenTelemetry; it
does not report tracing data for Pants rule code or Rust code.
- This backend does not support gRPC export due to fork safety issues
with the gRPC C library and Python. See
https://github.com/shoalsoft/shoalsoft-pants-opentelemetry-plugin/issues/84
and https://github.com/grpc/grpc/blob/master/doc/fork_support.md for
additional details.

# Lockfile

```
    Lockfile diff: 3rdparty/python/user_reqs.lock [python-default]

    ==                    Upgraded dependencies                     ==

      anyio                          4.12.1       -->   4.13.0
      certifi                        2026.1.4     -->   2026.4.22
      charset-normalizer             3.4.4        -->   3.4.7
      click                          8.3.1        -->   8.3.2
      cross-web                      0.4.1        -->   0.6.0
      cryptography                   46.0.5       -->   46.0.7
      graphql-core                   3.2.7        -->   3.2.8
      idna                           3.11         -->   3.12
      librt                          0.8.1        -->   0.9.0
      pydantic                       2.12.5       -->   2.13.3
      pydantic-core                  2.41.5       -->   2.46.3
      pygments                       2.19.2       -->   2.20.0
      pyjwt                          2.11.0       -->   2.12.1
      python-dotenv                  1.2.1        -->   1.2.2
      python-multipart               0.0.22       -->   0.0.26
      ujson                          5.11.0       -->   5.12.0

    ==                   ... (continued)

564 of 740 new or added lines in 12 files covered. (76.22%)

1 existing line in 1 file now uncovered.

92944 of 100169 relevant lines covered (92.79%)

4.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.03
/src/python/pants/backend/observability/opentelemetry/single_threaded_processor.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import datetime
2✔
7
import logging
2✔
8
import queue
2✔
9
from dataclasses import dataclass
2✔
10
from enum import Enum
2✔
11
from threading import Event, Thread
2✔
12

13
from pants.backend.observability.opentelemetry.processor import (
2✔
14
    IncompleteWorkunit,
15
    Processor,
16
    ProcessorContext,
17
    Workunit,
18
)
19

20
logger = logging.getLogger(__name__)
2✔
21

22

23
class _MessageType(Enum):
2✔
24
    START_WORKUNIT = "start_workunit"
2✔
25
    COMPLETE_WORKUNIT = "complete_workunit"
2✔
26
    FINISH = "finish"
2✔
27

28

29
@dataclass
2✔
30
class _FinishDetails:
2✔
31
    timeout: datetime.timedelta | None
2✔
32
    context: ProcessorContext
2✔
33

34

35
class SingleThreadedProcessor(Processor):
2✔
36
    """This is a `Processor` implementation which pushes all received workunits
37
    onto a queue for processing on a separate thread.
38

39
    This is useful for moving workunit operations off the engine's
40
    thread. Also, it allows working around any concurrency issues in an
41
    underlying `Processor` implementation since all operations will
42
    occur on a single, separate thread.
43
    """
44

45
    def __init__(self, processor: Processor) -> None:
2✔
46
        self._processor = processor
2✔
47

48
        self._initialize_completed_event = Event()
2✔
49
        self._finish_completed_event = Event()
2✔
50

51
        self._queue: queue.Queue[
2✔
52
            tuple[
53
                _MessageType,
54
                Workunit | IncompleteWorkunit | _FinishDetails,
55
                ProcessorContext,
56
            ]
57
        ] = queue.Queue()
58

59
        self._thread = Thread(target=self._processing_loop)
2✔
60
        self._thread.daemon = True
2✔
61

62
    def _handle_message(
2✔
63
        self,
64
        msg: tuple[_MessageType, Workunit | IncompleteWorkunit | _FinishDetails, ProcessorContext],
65
    ) -> _FinishDetails | None:
66
        """Processes messages.
67

68
        Returns a `_FinishDetails` to use for shutdown if the finish
69
        message was received, else None.
70
        """
71
        msg_type: _MessageType = msg[0]
1✔
72
        if msg_type == _MessageType.START_WORKUNIT:
1✔
73
            incomplete_workunit = msg[1]
1✔
74
            assert isinstance(incomplete_workunit, IncompleteWorkunit)
1✔
75
            self._processor.start_workunit(workunit=incomplete_workunit, context=msg[2])
1✔
76
            return None
1✔
77
        elif msg_type == _MessageType.COMPLETE_WORKUNIT:
1✔
78
            workunit = msg[1]
1✔
79
            assert isinstance(workunit, Workunit)
1✔
80
            self._processor.complete_workunit(workunit=workunit, context=msg[2])
1✔
81
            return None
1✔
82
        elif msg_type == _MessageType.FINISH:
1✔
83
            # Finish signalled. Let caller know what context to use for it.
84
            finish_details = msg[1]
1✔
85
            assert isinstance(finish_details, _FinishDetails)
1✔
86
            return finish_details
1✔
87
        else:
NEW
88
            raise AssertionError("Received unknown message type in SingleThreadedProcessor.")
×
89

90
    def _processing_loop(self) -> None:
2✔
91
        self._processor.initialize()
2✔
92
        self._initialize_completed_event.set()
2✔
93

94
        finish_details: _FinishDetails | None
95
        while msg := self._queue.get():
2✔
96
            finish_details = self._handle_message(msg)
1✔
97
            if finish_details is not None:
1✔
98
                break
1✔
99

100
        if self._queue.qsize() > 0:
1✔
NEW
101
            logger.warning(
×
102
                "Completion of workunit export was signalled before all workunits in flight were processed!"
103
            )
104

105
        self._processor.finish(timeout=finish_details.timeout, context=finish_details.context)
1✔
106
        self._finish_completed_event.set()
1✔
107

108
    def initialize(self) -> None:
2✔
109
        self._thread.start()
2✔
110
        if not self._initialize_completed_event.wait(5.0):
2✔
NEW
111
            raise RuntimeError("Work unit processor failed to report initialization.")
×
112

113
    def start_workunit(self, workunit: IncompleteWorkunit, *, context: ProcessorContext) -> None:
2✔
114
        self._queue.put_nowait((_MessageType.START_WORKUNIT, workunit, context))
1✔
115

116
    def complete_workunit(self, workunit: Workunit, *, context: ProcessorContext) -> None:
2✔
117
        self._queue.put_nowait((_MessageType.COMPLETE_WORKUNIT, workunit, context))
1✔
118

119
    def finish(
2✔
120
        self, timeout: datetime.timedelta | None = None, *, context: ProcessorContext
121
    ) -> None:
122
        self._queue.put_nowait(
1✔
123
            (_MessageType.FINISH, _FinishDetails(timeout=timeout, context=context), context)
124
        )
125
        timeout_seconds_opt = timeout.total_seconds() if timeout is not None else None
1✔
126
        if not self._finish_completed_event.wait(timeout_seconds_opt):
1✔
NEW
127
            raise RuntimeError("Work unit processor failed to report completion.")
×
128
        self._thread.join(timeout=timeout_seconds_opt)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc