• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25565075335

08 May 2026 03:47PM UTC coverage: 92.787% (-0.1%) from 92.887%
25565075335

push

github

web-flow
add OpenTelemetry backend for work unit reporting (#23284)

# Overview

Add a new `pants.backend.observability.opentelemetry` backend to report
work unit tracing to OpenTelemetry. The backend is based on
[shoalsoft-pants-opentelemetry-plugin](https://github.com/shoalsoft/shoalsoft-pants-opentelemetry-plugin)
with unnecessary compatibility code and "shoalsoft" branding removed.

Notes:
- This backend only reports Pants engine work units to OpenTelemetry; it
does not report tracing data for Pants rule code or Rust code.
- This backend does not support gRPC export due to fork safety issues
with the gRPC C library and Python. See
https://github.com/shoalsoft/shoalsoft-pants-opentelemetry-plugin/issues/84
and https://github.com/grpc/grpc/blob/master/doc/fork_support.md for
additional details.

# Lockfile

```
    Lockfile diff: 3rdparty/python/user_reqs.lock [python-default]

    ==                    Upgraded dependencies                     ==

      anyio                          4.12.1       -->   4.13.0
      certifi                        2026.1.4     -->   2026.4.22
      charset-normalizer             3.4.4        -->   3.4.7
      click                          8.3.1        -->   8.3.2
      cross-web                      0.4.1        -->   0.6.0
      cryptography                   46.0.5       -->   46.0.7
      graphql-core                   3.2.7        -->   3.2.8
      idna                           3.11         -->   3.12
      librt                          0.8.1        -->   0.9.0
      pydantic                       2.12.5       -->   2.13.3
      pydantic-core                  2.41.5       -->   2.46.3
      pygments                       2.19.2       -->   2.20.0
      pyjwt                          2.11.0       -->   2.12.1
      python-dotenv                  1.2.1        -->   1.2.2
      python-multipart               0.0.22       -->   0.0.26
      ujson                          5.11.0       -->   5.12.0

    ==                   ... (continued)

564 of 740 new or added lines in 12 files covered. (76.22%)

1 existing line in 1 file now uncovered.

92944 of 100169 relevant lines covered (92.79%)

4.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.29
/src/python/pants/backend/observability/opentelemetry/opentelemetry_processor.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import datetime
1✔
7
import json
1✔
8
import logging
1✔
9
import os
1✔
10
import typing
1✔
11
from contextlib import contextmanager
1✔
12
from pathlib import Path
1✔
13
from typing import TextIO
1✔
14

15
from opentelemetry import trace
1✔
16
from opentelemetry.context import Context
1✔
17
from opentelemetry.exporter.otlp.proto.http import Compression
1✔
18
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
1✔
19
    OTLPSpanExporter as HttpOTLPSpanExporter,
20
)
21
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
1✔
22
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, TracerProvider, sampling
1✔
23
from opentelemetry.sdk.trace.export import (
1✔
24
    BatchSpanProcessor,
25
    SpanExporter,
26
    SpanExportResult,
27
)
28
from opentelemetry.trace import Link, TraceFlags
1✔
29
from opentelemetry.trace.span import (
1✔
30
    NonRecordingSpan,
31
    Span,
32
    SpanContext,
33
    format_span_id,
34
    format_trace_id,
35
)
36
from opentelemetry.trace.status import StatusCode
1✔
37
from pants.backend.observability.opentelemetry.opentelemetry_config import OtlpParameters
1✔
38
from pants.backend.observability.opentelemetry.processor import (
1✔
39
    IncompleteWorkunit,
40
    Level,
41
    Processor,
42
    ProcessorContext,
43
    Workunit,
44
)
45
from pants.backend.observability.opentelemetry.subsystem import TracingExporterId
1✔
46
from pants.util.frozendict import FrozenDict
1✔
47

48
logger = logging.getLogger(__name__)
1✔
49

50
_UNIX_EPOCH = datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.UTC)
1✔
51

52

53
@contextmanager
1✔
54
def _temp_env_var(key: str, value: str | None):
1✔
55
    """Temporarily set an environment variable, restoring the original value
56
    afterward."""
57
    old_value = os.environ.get(key)
1✔
58
    try:
1✔
59
        if value is not None:
1✔
NEW
60
            os.environ[key] = value
×
61
        yield
1✔
62
    finally:
63
        if old_value is None:
1✔
64
            os.environ.pop(key, None)
1✔
65
        else:
NEW
66
            os.environ[key] = old_value
×
67

68

69
def _datetime_to_otel_timestamp(d: datetime.datetime) -> int:
1✔
70
    """OTEL times are nanoseconds since the Unix epoch."""
NEW
71
    duration_since_epoch = d - _UNIX_EPOCH
×
NEW
72
    nanoseconds = duration_since_epoch.days * 24 * 60 * 60 * 1000000000
×
NEW
73
    nanoseconds += duration_since_epoch.seconds * 1000000000
×
NEW
74
    nanoseconds += duration_since_epoch.microseconds * 1000
×
NEW
75
    return nanoseconds
×
76

77

78
class JsonFileSpanExporter(SpanExporter):
1✔
79
    def __init__(self, file: TextIO) -> None:
1✔
80
        self._file = file
1✔
81

82
    def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
1✔
NEW
83
        for span in spans:
×
NEW
84
            self._file.write(span.to_json(indent=0).replace("\n", " ") + "\n")
×
NEW
85
        return SpanExportResult.SUCCESS
×
86

87
    def shutdown(self) -> None:
1✔
NEW
88
        self._file.close()
×
89

90
    def force_flush(self, timeout_millis: int = 30000) -> bool:
1✔
NEW
91
        self._file.flush()
×
NEW
92
        return True
×
93

94

95
def get_processor(
1✔
96
    span_exporter_name: TracingExporterId,
97
    otlp_parameters: OtlpParameters,
98
    build_root: Path,
99
    traceparent_env_var: str | None,
100
    otel_resource_attributes: str | None,
101
    json_file: str | None,
102
    trace_link_template: str | None,
103
) -> Processor:
104
    logger.debug(f"OTEL: get_processor: otlp_parameters={otlp_parameters}; build_root={build_root}")
1✔
105

106
    # Temporarily set OTEL_RESOURCE_ATTRIBUTES so Resource.create() can parse it
107
    with _temp_env_var("OTEL_RESOURCE_ATTRIBUTES", otel_resource_attributes):
1✔
108
        # Resource.create() will automatically merge OTEL_RESOURCE_ATTRIBUTES from os.environ
109
        resource = Resource.create(
1✔
110
            attributes={
111
                SERVICE_NAME: "pantsbuild",
112
            }
113
        )
114
    tracer_provider = TracerProvider(
1✔
115
        sampler=sampling.ALWAYS_ON, resource=resource, shutdown_on_exit=False
116
    )
117
    tracer = tracer_provider.get_tracer(__name__)
1✔
118

119
    span_exporter: SpanExporter
120
    if span_exporter_name == TracingExporterId.OTLP:
1✔
NEW
121
        span_exporter = HttpOTLPSpanExporter(
×
122
            endpoint=otlp_parameters.resolve_traces_endpoint(),
123
            certificate_file=otlp_parameters.certificate_file,
124
            client_key_file=otlp_parameters.client_key_file,
125
            client_certificate_file=otlp_parameters.client_certificate_file,
126
            headers=dict(otlp_parameters.headers) if otlp_parameters.headers else None,
127
            timeout=otlp_parameters.timeout,
128
            compression=Compression(otlp_parameters.compression),
129
        )
130
    elif span_exporter_name == TracingExporterId.JSON_FILE:
1✔
131
        json_file_path_str = json_file
1✔
132
        if not json_file_path_str:
1✔
NEW
133
            raise ValueError(
×
134
                f"`--opentelemetry-exporter` is set to `{TracingExporterId.JSON_FILE}` "
135
                "but the `--opentelemetry-json-file` option is not set."
136
            )
137
        json_file_path = build_root / json_file_path_str
1✔
138
        json_file_path.parent.mkdir(parents=True, exist_ok=True)
1✔
139
        span_exporter = JsonFileSpanExporter(open(json_file_path, "w"))
1✔
140
        logger.debug(f"Enabling OpenTelemetry JSON file span exporter: path={json_file_path}")
1✔
141
    else:
NEW
142
        raise AssertionError(f"Unknown span exporter type: {span_exporter_name.value}")
×
143

144
    span_processor = BatchSpanProcessor(
1✔
145
        span_exporter=span_exporter,
146
        max_queue_size=512,
147
        max_export_batch_size=100,
148
        export_timeout_millis=5000,
149
        schedule_delay_millis=30000,
150
    )
151
    tracer_provider.add_span_processor(span_processor)
1✔
152

153
    otel_processor = OpenTelemetryProcessor(
1✔
154
        tracer=tracer,
155
        span_processor=span_processor,
156
        traceparent_env_var=traceparent_env_var,
157
        tracer_provider=tracer_provider,
158
        trace_link_template=trace_link_template,
159
    )
160

161
    return otel_processor
1✔
162

163

164
class DummySpan(NonRecordingSpan):
1✔
165
    """A dummy Span used in the thread context so we can trick OpenTelemetry as
166
    to what the parent span ID is.
167

168
    Sets `is_recording` to True.
169
    """
170

171
    def is_recording(self) -> bool:
1✔
NEW
172
        return True
×
173

174
    def __repr__(self) -> str:
1✔
NEW
175
        return f"DummySpan({self._context!r})"
×
176

177

178
def _parse_id(id_hex: str, id_hex_chars_len: int) -> int:
1✔
179
    # Remove any potential formatting like hyphens or "0x" prefix
NEW
180
    id_hex = id_hex.replace("-", "").replace("0x", "").lower()
×
181

182
    # Check if the length is correct for the given ID type.
NEW
183
    if len(id_hex) != id_hex_chars_len:
×
NEW
184
        raise ValueError(
×
185
            f"Invalid ID length: expected {id_hex_chars_len} hex chars, got {len(id_hex)} instead."
186
        )
187

188
    # Convert hex string to integer
NEW
189
    return int(id_hex, 16)
×
190

191

192
def _parse_traceparent(value: str) -> tuple[int, int] | None:
1✔
NEW
193
    parts = value.split("-")
×
NEW
194
    if len(parts) < 3:
×
NEW
195
        return None
×
196

NEW
197
    try:
×
NEW
198
        trace_id = _parse_id(parts[1], 32)
×
NEW
199
    except ValueError as e:
×
NEW
200
        logger.warning(f"Ignoring TRACEPARENT due to failure to parse trace ID `{parts[1]}`: {e}")
×
NEW
201
        return None
×
202

NEW
203
    try:
×
NEW
204
        span_id = _parse_id(parts[2], 16)
×
NEW
205
    except ValueError as e:
×
NEW
206
        logger.warning(f"Ignoring TRACEPARENT due to failure to parse span ID `{parts[2]}`: {e}")
×
NEW
207
        return None
×
208

NEW
209
    return trace_id, span_id
×
210

211

212
class _Encoder(json.JSONEncoder):
1✔
213
    def default(self, o):
1✔
NEW
214
        if isinstance(o, FrozenDict):
×
NEW
215
            return o._data
×
NEW
216
        return super().default(o)
×
217

218

219
class OpenTelemetryProcessor(Processor):
1✔
220
    def __init__(
1✔
221
        self,
222
        tracer: trace.Tracer,
223
        span_processor: SpanProcessor,
224
        traceparent_env_var: str | None,
225
        tracer_provider: TracerProvider,
226
        trace_link_template: str | None,
227
    ) -> None:
228
        self._tracer = tracer
1✔
229
        self._tracer_provider = tracer_provider
1✔
230
        self._trace_id: int | None = None
1✔
231
        self._workunit_span_id_to_otel_span_id: dict[str, int] = {}
1✔
232
        self._otel_spans: dict[int, trace.Span] = {}
1✔
233
        self._span_processor = span_processor
1✔
234
        self._span_count: int = 0
1✔
235
        self._counters: dict[str, int] = {}
1✔
236
        self._trace_link_template: str | None = trace_link_template
1✔
237
        self._initialized: bool = False
1✔
238
        self._shutdown: bool = False
1✔
239

240
        self._parent_trace_id: int | None = None
1✔
241
        self._parent_span_id: int | None = None
1✔
242
        if traceparent_env_var is not None:
1✔
NEW
243
            ids = _parse_traceparent(traceparent_env_var)
×
NEW
244
            if ids is not None:
×
NEW
245
                self._parent_trace_id = ids[0]
×
NEW
246
                self._parent_span_id = ids[1]
×
247

248
    def initialize(self) -> None:
1✔
249
        if self._initialized:
1✔
NEW
250
            raise RuntimeError("OTEL: processor already initialized")
×
251
        logger.debug("OpenTelemetryProcessor.initialize called")
1✔
252
        self._initialized = True
1✔
253

254
    def _increment_counter(self, name: str, delta: int = 1) -> None:
1✔
NEW
255
        if name not in self._counters:
×
NEW
256
            self._counters[name] = 0
×
NEW
257
        self._counters[name] += delta
×
258

259
    def _log_trace_link(
1✔
260
        self,
261
        root_span_id: int,
262
        root_span_start_time: datetime.datetime,
263
        root_span_end_time: datetime.datetime,
264
    ) -> None:
NEW
265
        template = self._trace_link_template
×
NEW
266
        if not template:
×
NEW
267
            return
×
268

NEW
269
        replacements = {
×
270
            "trace_id": format_trace_id(self._trace_id) if self._trace_id else "UNKNOWN",
271
            "root_span_id": format_span_id(root_span_id),
272
            "trace_start_ms": str(int(root_span_start_time.timestamp() * 1000)),
273
            "trace_end_ms": str(int(root_span_end_time.timestamp() * 1000)),
274
        }
NEW
275
        trace_link = template.format(**replacements)
×
NEW
276
        logger.info(f"OpenTelemetry trace link: {trace_link}")
×
277

278
    def _construct_otel_span(
1✔
279
        self,
280
        *,
281
        workunit_span_id: str,
282
        workunit_parent_span_id: str | None,
283
        name: str,
284
        start_time: datetime.datetime,
285
    ) -> tuple[Span, int]:
286
        """Construct an OpenTelemetry span.
287

288
        Shared between `start_workunit` and `complete_workunit` since
289
        some spans may arrive already-completed.
290
        """
NEW
291
        assert workunit_span_id not in self._workunit_span_id_to_otel_span_id
×
292

NEW
293
        otel_context = Context()
×
NEW
294
        if workunit_parent_span_id:
×
295
            # OpenTelemetry pulls the parent span ID from the span set as "current" in the supplied context.
NEW
296
            assert self._trace_id is not None
×
NEW
297
            otel_parent_span_context = SpanContext(
×
298
                trace_id=self._trace_id,
299
                span_id=self._workunit_span_id_to_otel_span_id[workunit_parent_span_id],
300
                is_remote=False,
301
            )
NEW
302
            otel_context = trace.set_span_in_context(
×
303
                DummySpan(otel_parent_span_context), context=otel_context
304
            )
305

306
        # Record a "link" on the root span to any parent trace set via TRACEPARENT.
NEW
307
        links: list[Link] = []
×
NEW
308
        if not workunit_parent_span_id and self._parent_trace_id and self._parent_span_id:
×
NEW
309
            parent_trace_id_context = SpanContext(
×
310
                trace_id=self._parent_trace_id,
311
                span_id=self._parent_span_id,
312
                is_remote=True,
313
                trace_flags=TraceFlags(TraceFlags.SAMPLED),
314
            )
NEW
315
            links.append(Link(context=parent_trace_id_context))
×
316

NEW
317
        otel_span = self._tracer.start_span(
×
318
            name=name,
319
            context=otel_context,
320
            start_time=_datetime_to_otel_timestamp(start_time),
321
            record_exception=False,
322
            set_status_on_exception=False,
323
            links=links,
324
        )
325

326
        # Record the span ID chosen by the tracer for this span.
NEW
327
        otel_span_context = otel_span.get_span_context()
×
NEW
328
        otel_span_id = otel_span_context.span_id
×
NEW
329
        self._workunit_span_id_to_otel_span_id[workunit_span_id] = otel_span_id
×
NEW
330
        self._otel_spans[otel_span_id] = otel_span
×
331

332
        # Record the trace ID generated the first time any span is constructed.
NEW
333
        if self._trace_id is None:
×
NEW
334
            self._trace_id = otel_span.get_span_context().trace_id
×
335

NEW
336
        return otel_span, otel_span_id
×
337

338
    def _apply_incomplete_workunit_attributes(
1✔
339
        self, workunit: IncompleteWorkunit, otel_span: Span
340
    ) -> None:
NEW
341
        otel_span.set_attribute("pantsbuild.workunit.span_id", workunit.span_id)
×
NEW
342
        otel_span.set_attribute("pantsbuild.workunit.parent_span_ids", workunit.parent_ids)
×
343

NEW
344
        otel_span.set_attribute("pantsbuild.workunit.level", workunit.level.value.upper())
×
NEW
345
        if workunit.level == Level.ERROR:
×
NEW
346
            otel_span.set_status(StatusCode.ERROR)
×
347

348
    def _apply_workunit_attributes(self, workunit: Workunit, otel_span: Span) -> None:
1✔
NEW
349
        self._apply_incomplete_workunit_attributes(workunit=workunit, otel_span=otel_span)
×
350

NEW
351
        for key, value in workunit.metadata.items():
×
NEW
352
            if isinstance(
×
353
                value,
354
                (
355
                    str,
356
                    bool,
357
                    int,
358
                    float,
359
                ),
360
            ):
NEW
361
                otel_span.set_attribute(f"pantsbuild.workunit.metadata.{key}", value)
×
362

363
    def start_workunit(self, workunit: IncompleteWorkunit, *, context: ProcessorContext) -> None:
1✔
NEW
364
        if not self._initialized:
×
NEW
365
            raise RuntimeError("OTEL: start_workunit called on uninitialized processor")
×
NEW
366
        if self._shutdown:
×
NEW
367
            raise RuntimeError("OTEL: start_workunit called on shutdown processor")
×
NEW
368
        if workunit.span_id in self._workunit_span_id_to_otel_span_id:
×
NEW
369
            self._increment_counter("multiple_start_workunit_for_span_id")
×
NEW
370
            return
×
371

NEW
372
        otel_span, _ = self._construct_otel_span(
×
373
            workunit_span_id=workunit.span_id,
374
            workunit_parent_span_id=workunit.primary_parent_id,
375
            name=workunit.name,
376
            start_time=workunit.start_time,
377
        )
378

NEW
379
        self._apply_incomplete_workunit_attributes(workunit=workunit, otel_span=otel_span)
×
380

381
    def complete_workunit(self, workunit: Workunit, *, context: ProcessorContext) -> None:
1✔
NEW
382
        if not self._initialized:
×
NEW
383
            raise RuntimeError("OTEL: complete_workunit called on uninitialized processor")
×
NEW
384
        if self._shutdown:
×
NEW
385
            raise RuntimeError("OTEL: complete_workunit called on shutdown processor")
×
386
        otel_span: Span
387
        otel_span_id: int
NEW
388
        if workunit.span_id in self._workunit_span_id_to_otel_span_id:
×
NEW
389
            otel_span_id = self._workunit_span_id_to_otel_span_id[workunit.span_id]
×
NEW
390
            otel_span = self._otel_spans[otel_span_id]
×
391
        else:
NEW
392
            otel_span, otel_span_id = self._construct_otel_span(
×
393
                workunit_span_id=workunit.span_id,
394
                workunit_parent_span_id=workunit.primary_parent_id,
395
                name=workunit.name,
396
                start_time=workunit.start_time,
397
            )
398

NEW
399
        self._apply_workunit_attributes(workunit=workunit, otel_span=otel_span)
×
400

401
        # Set the metrics for the session as an attribute of the root span.
NEW
402
        if not workunit.primary_parent_id:
×
NEW
403
            metrics = context.get_metrics()
×
NEW
404
            otel_span.set_attribute(
×
405
                "pantsbuild.metrics-v0", json.dumps(metrics, sort_keys=True, cls=_Encoder)
406
            )
407

NEW
408
        otel_span.end(end_time=_datetime_to_otel_timestamp(workunit.end_time))
×
409

NEW
410
        del self._otel_spans[otel_span_id]
×
NEW
411
        self._span_count += 1
×
412

413
        # If this the root span, then log any vendor trace link as a side effect.
NEW
414
        if not workunit.primary_parent_id and self._trace_link_template:
×
NEW
415
            self._log_trace_link(
×
416
                root_span_id=otel_span_id,
417
                root_span_start_time=workunit.start_time,
418
                root_span_end_time=workunit.end_time,
419
            )
420

421
    def finish(
1✔
422
        self, timeout: datetime.timedelta | None = None, *, context: ProcessorContext
423
    ) -> None:
NEW
424
        if self._shutdown:
×
NEW
425
            raise RuntimeError("OTEL: finish called on shutdown processor")
×
NEW
426
        logger.debug("OpenTelemetryProcessor requested to finish workunit transmission.")
×
NEW
427
        logger.debug(f"OpenTelemetry processing counters: {self._counters.items()}")
×
NEW
428
        if len(self._otel_spans) > 0:
×
NEW
429
            logger.warning(
×
430
                "Multiple OpenTelemetry spans have not been submitted as completed to the library."
431
            )
NEW
432
        timeout_millis: int = int(timeout.total_seconds() * 1000.0) if timeout is not None else 2000
×
NEW
433
        self._span_processor.force_flush(timeout_millis)
×
NEW
434
        self._span_processor.shutdown()
×
NEW
435
        self._tracer_provider.shutdown()
×
NEW
436
        self._shutdown = True
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc