• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26260209689

21 May 2026 11:59PM UTC coverage: 75.453% (-15.7%) from 91.156%
26260209689

Pull #23365

github

web-flow
Merge 5fe873b58 into 7ea655ba0
Pull Request #23365: uv.lock -> pex optimization

5 of 16 new or added lines in 1 file covered. (31.25%)

10118 existing lines in 378 files now uncovered.

54669 of 72454 relevant lines covered (75.45%)

2.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/observability/opentelemetry/opentelemetry_processor.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import datetime
×
UNCOV
7
import json
×
UNCOV
8
import logging
×
UNCOV
9
import os
×
UNCOV
10
import typing
×
UNCOV
11
from contextlib import contextmanager
×
UNCOV
12
from pathlib import Path
×
UNCOV
13
from typing import TextIO
×
14

UNCOV
15
from opentelemetry import trace
×
UNCOV
16
from opentelemetry.context import Context
×
UNCOV
17
from opentelemetry.exporter.otlp.proto.http import Compression
×
UNCOV
18
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
×
19
    OTLPSpanExporter as HttpOTLPSpanExporter,
20
)
UNCOV
21
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
×
UNCOV
22
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, TracerProvider, sampling
×
UNCOV
23
from opentelemetry.sdk.trace.export import (
×
24
    BatchSpanProcessor,
25
    SpanExporter,
26
    SpanExportResult,
27
)
UNCOV
28
from opentelemetry.trace import Link, TraceFlags
×
UNCOV
29
from opentelemetry.trace.span import (
×
30
    NonRecordingSpan,
31
    Span,
32
    SpanContext,
33
    format_span_id,
34
    format_trace_id,
35
)
UNCOV
36
from opentelemetry.trace.status import StatusCode
×
UNCOV
37
from pants.backend.observability.opentelemetry.opentelemetry_config import OtlpParameters
×
UNCOV
38
from pants.backend.observability.opentelemetry.processor import (
×
39
    IncompleteWorkunit,
40
    Level,
41
    Processor,
42
    ProcessorContext,
43
    Workunit,
44
)
UNCOV
45
from pants.backend.observability.opentelemetry.subsystem import TracingExporterId
×
UNCOV
46
from pants.util.frozendict import FrozenDict
×
47

UNCOV
48
logger = logging.getLogger(__name__)
×
49

UNCOV
50
_UNIX_EPOCH = datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.UTC)
×
51

52

UNCOV
53
@contextmanager
×
UNCOV
54
def _temp_env_var(key: str, value: str | None):
×
55
    """Temporarily set an environment variable, restoring the original value
56
    afterward."""
UNCOV
57
    old_value = os.environ.get(key)
×
UNCOV
58
    try:
×
UNCOV
59
        if value is not None:
×
60
            os.environ[key] = value
×
UNCOV
61
        yield
×
62
    finally:
UNCOV
63
        if old_value is None:
×
UNCOV
64
            os.environ.pop(key, None)
×
65
        else:
66
            os.environ[key] = old_value
×
67

68

UNCOV
69
def _datetime_to_otel_timestamp(d: datetime.datetime) -> int:
×
70
    """OTEL times are nanoseconds since the Unix epoch."""
71
    duration_since_epoch = d - _UNIX_EPOCH
×
72
    nanoseconds = duration_since_epoch.days * 24 * 60 * 60 * 1000000000
×
73
    nanoseconds += duration_since_epoch.seconds * 1000000000
×
74
    nanoseconds += duration_since_epoch.microseconds * 1000
×
75
    return nanoseconds
×
76

77

UNCOV
78
class JsonFileSpanExporter(SpanExporter):
×
UNCOV
79
    def __init__(self, file: TextIO) -> None:
×
UNCOV
80
        self._file = file
×
81

UNCOV
82
    def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
×
83
        for span in spans:
×
84
            self._file.write(span.to_json(indent=0).replace("\n", " ") + "\n")
×
85
        return SpanExportResult.SUCCESS
×
86

UNCOV
87
    def shutdown(self) -> None:
×
88
        self._file.close()
×
89

UNCOV
90
    def force_flush(self, timeout_millis: int = 30000) -> bool:
×
91
        self._file.flush()
×
92
        return True
×
93

94

UNCOV
95
def get_processor(
×
96
    span_exporter_name: TracingExporterId,
97
    otlp_parameters: OtlpParameters,
98
    build_root: Path,
99
    traceparent_env_var: str | None,
100
    otel_resource_attributes: str | None,
101
    json_file: str | None,
102
    trace_link_template: str | None,
103
) -> Processor:
UNCOV
104
    logger.debug(f"OTEL: get_processor: otlp_parameters={otlp_parameters}; build_root={build_root}")
×
105

106
    # Temporarily set OTEL_RESOURCE_ATTRIBUTES so Resource.create() can parse it
UNCOV
107
    with _temp_env_var("OTEL_RESOURCE_ATTRIBUTES", otel_resource_attributes):
×
108
        # Resource.create() will automatically merge OTEL_RESOURCE_ATTRIBUTES from os.environ
UNCOV
109
        resource = Resource.create(
×
110
            attributes={
111
                SERVICE_NAME: "pantsbuild",
112
            }
113
        )
UNCOV
114
    tracer_provider = TracerProvider(
×
115
        sampler=sampling.ALWAYS_ON, resource=resource, shutdown_on_exit=False
116
    )
UNCOV
117
    tracer = tracer_provider.get_tracer(__name__)
×
118

119
    span_exporter: SpanExporter
UNCOV
120
    if span_exporter_name == TracingExporterId.OTLP:
×
121
        span_exporter = HttpOTLPSpanExporter(
×
122
            endpoint=otlp_parameters.resolve_traces_endpoint(),
123
            certificate_file=otlp_parameters.certificate_file,
124
            client_key_file=otlp_parameters.client_key_file,
125
            client_certificate_file=otlp_parameters.client_certificate_file,
126
            headers=dict(otlp_parameters.headers) if otlp_parameters.headers else None,
127
            timeout=otlp_parameters.timeout,
128
            compression=Compression(otlp_parameters.compression),
129
        )
UNCOV
130
    elif span_exporter_name == TracingExporterId.JSON_FILE:
×
UNCOV
131
        json_file_path_str = json_file
×
UNCOV
132
        if not json_file_path_str:
×
133
            raise ValueError(
×
134
                f"`--opentelemetry-exporter` is set to `{TracingExporterId.JSON_FILE}` "
135
                "but the `--opentelemetry-json-file` option is not set."
136
            )
UNCOV
137
        json_file_path = build_root / json_file_path_str
×
UNCOV
138
        json_file_path.parent.mkdir(parents=True, exist_ok=True)
×
UNCOV
139
        span_exporter = JsonFileSpanExporter(open(json_file_path, "w"))
×
UNCOV
140
        logger.debug(f"Enabling OpenTelemetry JSON file span exporter: path={json_file_path}")
×
141
    else:
142
        raise AssertionError(f"Unknown span exporter type: {span_exporter_name.value}")
×
143

UNCOV
144
    span_processor = BatchSpanProcessor(
×
145
        span_exporter=span_exporter,
146
        max_queue_size=512,
147
        max_export_batch_size=100,
148
        export_timeout_millis=5000,
149
        schedule_delay_millis=30000,
150
    )
UNCOV
151
    tracer_provider.add_span_processor(span_processor)
×
152

UNCOV
153
    otel_processor = OpenTelemetryProcessor(
×
154
        tracer=tracer,
155
        span_processor=span_processor,
156
        traceparent_env_var=traceparent_env_var,
157
        tracer_provider=tracer_provider,
158
        trace_link_template=trace_link_template,
159
    )
160

UNCOV
161
    return otel_processor
×
162

163

UNCOV
164
class DummySpan(NonRecordingSpan):
×
165
    """A dummy Span used in the thread context so we can trick OpenTelemetry as
166
    to what the parent span ID is.
167

168
    Sets `is_recording` to True.
169
    """
170

UNCOV
171
    def is_recording(self) -> bool:
×
172
        return True
×
173

UNCOV
174
    def __repr__(self) -> str:
×
175
        return f"DummySpan({self._context!r})"
×
176

177

UNCOV
178
def _parse_id(id_hex: str, id_hex_chars_len: int) -> int:
×
179
    # Remove any potential formatting like hyphens or "0x" prefix
180
    id_hex = id_hex.replace("-", "").replace("0x", "").lower()
×
181

182
    # Check if the length is correct for the given ID type.
183
    if len(id_hex) != id_hex_chars_len:
×
184
        raise ValueError(
×
185
            f"Invalid ID length: expected {id_hex_chars_len} hex chars, got {len(id_hex)} instead."
186
        )
187

188
    # Convert hex string to integer
189
    return int(id_hex, 16)
×
190

191

UNCOV
192
def _parse_traceparent(value: str) -> tuple[int, int] | None:
×
193
    parts = value.split("-")
×
194
    if len(parts) < 3:
×
195
        return None
×
196

197
    try:
×
198
        trace_id = _parse_id(parts[1], 32)
×
199
    except ValueError as e:
×
200
        logger.warning(f"Ignoring TRACEPARENT due to failure to parse trace ID `{parts[1]}`: {e}")
×
201
        return None
×
202

203
    try:
×
204
        span_id = _parse_id(parts[2], 16)
×
205
    except ValueError as e:
×
206
        logger.warning(f"Ignoring TRACEPARENT due to failure to parse span ID `{parts[2]}`: {e}")
×
207
        return None
×
208

209
    return trace_id, span_id
×
210

211

UNCOV
212
class _Encoder(json.JSONEncoder):
×
UNCOV
213
    def default(self, o):
×
214
        if isinstance(o, FrozenDict):
×
215
            return o._data
×
216
        return super().default(o)
×
217

218

UNCOV
219
class OpenTelemetryProcessor(Processor):
×
UNCOV
220
    def __init__(
×
221
        self,
222
        tracer: trace.Tracer,
223
        span_processor: SpanProcessor,
224
        traceparent_env_var: str | None,
225
        tracer_provider: TracerProvider,
226
        trace_link_template: str | None,
227
    ) -> None:
UNCOV
228
        self._tracer = tracer
×
UNCOV
229
        self._tracer_provider = tracer_provider
×
UNCOV
230
        self._trace_id: int | None = None
×
UNCOV
231
        self._workunit_span_id_to_otel_span_id: dict[str, int] = {}
×
UNCOV
232
        self._otel_spans: dict[int, trace.Span] = {}
×
UNCOV
233
        self._span_processor = span_processor
×
UNCOV
234
        self._span_count: int = 0
×
UNCOV
235
        self._counters: dict[str, int] = {}
×
UNCOV
236
        self._trace_link_template: str | None = trace_link_template
×
UNCOV
237
        self._initialized: bool = False
×
UNCOV
238
        self._shutdown: bool = False
×
239

UNCOV
240
        self._parent_trace_id: int | None = None
×
UNCOV
241
        self._parent_span_id: int | None = None
×
UNCOV
242
        if traceparent_env_var is not None:
×
243
            ids = _parse_traceparent(traceparent_env_var)
×
244
            if ids is not None:
×
245
                self._parent_trace_id = ids[0]
×
246
                self._parent_span_id = ids[1]
×
247

UNCOV
248
    def initialize(self) -> None:
×
UNCOV
249
        if self._initialized:
×
250
            raise RuntimeError("OTEL: processor already initialized")
×
UNCOV
251
        logger.debug("OpenTelemetryProcessor.initialize called")
×
UNCOV
252
        self._initialized = True
×
253

UNCOV
254
    def _increment_counter(self, name: str, delta: int = 1) -> None:
×
255
        if name not in self._counters:
×
256
            self._counters[name] = 0
×
257
        self._counters[name] += delta
×
258

UNCOV
259
    def _log_trace_link(
×
260
        self,
261
        root_span_id: int,
262
        root_span_start_time: datetime.datetime,
263
        root_span_end_time: datetime.datetime,
264
    ) -> None:
265
        template = self._trace_link_template
×
266
        if not template:
×
267
            return
×
268

269
        replacements = {
×
270
            "trace_id": format_trace_id(self._trace_id) if self._trace_id else "UNKNOWN",
271
            "root_span_id": format_span_id(root_span_id),
272
            "trace_start_ms": str(int(root_span_start_time.timestamp() * 1000)),
273
            "trace_end_ms": str(int(root_span_end_time.timestamp() * 1000)),
274
        }
275
        trace_link = template.format(**replacements)
×
276
        logger.info(f"OpenTelemetry trace link: {trace_link}")
×
277

UNCOV
278
    def _construct_otel_span(
×
279
        self,
280
        *,
281
        workunit_span_id: str,
282
        workunit_parent_span_id: str | None,
283
        name: str,
284
        start_time: datetime.datetime,
285
    ) -> tuple[Span, int]:
286
        """Construct an OpenTelemetry span.
287

288
        Shared between `start_workunit` and `complete_workunit` since
289
        some spans may arrive already-completed.
290
        """
291
        assert workunit_span_id not in self._workunit_span_id_to_otel_span_id
×
292

293
        otel_context = Context()
×
294
        if workunit_parent_span_id:
×
295
            # OpenTelemetry pulls the parent span ID from the span set as "current" in the supplied context.
296
            assert self._trace_id is not None
×
297
            otel_parent_span_context = SpanContext(
×
298
                trace_id=self._trace_id,
299
                span_id=self._workunit_span_id_to_otel_span_id[workunit_parent_span_id],
300
                is_remote=False,
301
            )
302
            otel_context = trace.set_span_in_context(
×
303
                DummySpan(otel_parent_span_context), context=otel_context
304
            )
305

306
        # Record a "link" on the root span to any parent trace set via TRACEPARENT.
307
        links: list[Link] = []
×
308
        if not workunit_parent_span_id and self._parent_trace_id and self._parent_span_id:
×
309
            parent_trace_id_context = SpanContext(
×
310
                trace_id=self._parent_trace_id,
311
                span_id=self._parent_span_id,
312
                is_remote=True,
313
                trace_flags=TraceFlags(TraceFlags.SAMPLED),
314
            )
315
            links.append(Link(context=parent_trace_id_context))
×
316

317
        otel_span = self._tracer.start_span(
×
318
            name=name,
319
            context=otel_context,
320
            start_time=_datetime_to_otel_timestamp(start_time),
321
            record_exception=False,
322
            set_status_on_exception=False,
323
            links=links,
324
        )
325

326
        # Record the span ID chosen by the tracer for this span.
327
        otel_span_context = otel_span.get_span_context()
×
328
        otel_span_id = otel_span_context.span_id
×
329
        self._workunit_span_id_to_otel_span_id[workunit_span_id] = otel_span_id
×
330
        self._otel_spans[otel_span_id] = otel_span
×
331

332
        # Record the trace ID generated the first time any span is constructed.
333
        if self._trace_id is None:
×
334
            self._trace_id = otel_span.get_span_context().trace_id
×
335

336
        return otel_span, otel_span_id
×
337

UNCOV
338
    def _apply_incomplete_workunit_attributes(
×
339
        self, workunit: IncompleteWorkunit, otel_span: Span
340
    ) -> None:
341
        otel_span.set_attribute("pantsbuild.workunit.span_id", workunit.span_id)
×
342
        otel_span.set_attribute("pantsbuild.workunit.parent_span_ids", workunit.parent_ids)
×
343

344
        otel_span.set_attribute("pantsbuild.workunit.level", workunit.level.value.upper())
×
345
        if workunit.level == Level.ERROR:
×
346
            otel_span.set_status(StatusCode.ERROR)
×
347

UNCOV
348
    def _apply_workunit_attributes(self, workunit: Workunit, otel_span: Span) -> None:
×
349
        self._apply_incomplete_workunit_attributes(workunit=workunit, otel_span=otel_span)
×
350

351
        for key, value in workunit.metadata.items():
×
352
            if isinstance(
×
353
                value,
354
                (
355
                    str,
356
                    bool,
357
                    int,
358
                    float,
359
                ),
360
            ):
361
                otel_span.set_attribute(f"pantsbuild.workunit.metadata.{key}", value)
×
362

UNCOV
363
    def start_workunit(self, workunit: IncompleteWorkunit, *, context: ProcessorContext) -> None:
×
364
        if not self._initialized:
×
365
            raise RuntimeError("OTEL: start_workunit called on uninitialized processor")
×
366
        if self._shutdown:
×
367
            raise RuntimeError("OTEL: start_workunit called on shutdown processor")
×
368
        if workunit.span_id in self._workunit_span_id_to_otel_span_id:
×
369
            self._increment_counter("multiple_start_workunit_for_span_id")
×
370
            return
×
371

372
        otel_span, _ = self._construct_otel_span(
×
373
            workunit_span_id=workunit.span_id,
374
            workunit_parent_span_id=workunit.primary_parent_id,
375
            name=workunit.name,
376
            start_time=workunit.start_time,
377
        )
378

379
        self._apply_incomplete_workunit_attributes(workunit=workunit, otel_span=otel_span)
×
380

UNCOV
381
    def complete_workunit(self, workunit: Workunit, *, context: ProcessorContext) -> None:
×
382
        if not self._initialized:
×
383
            raise RuntimeError("OTEL: complete_workunit called on uninitialized processor")
×
384
        if self._shutdown:
×
385
            raise RuntimeError("OTEL: complete_workunit called on shutdown processor")
×
386
        otel_span: Span
387
        otel_span_id: int
388
        if workunit.span_id in self._workunit_span_id_to_otel_span_id:
×
389
            otel_span_id = self._workunit_span_id_to_otel_span_id[workunit.span_id]
×
390
            otel_span = self._otel_spans[otel_span_id]
×
391
        else:
392
            otel_span, otel_span_id = self._construct_otel_span(
×
393
                workunit_span_id=workunit.span_id,
394
                workunit_parent_span_id=workunit.primary_parent_id,
395
                name=workunit.name,
396
                start_time=workunit.start_time,
397
            )
398

399
        self._apply_workunit_attributes(workunit=workunit, otel_span=otel_span)
×
400

401
        # Set the metrics for the session as an attribute of the root span.
402
        if not workunit.primary_parent_id:
×
403
            metrics = context.get_metrics()
×
404
            otel_span.set_attribute(
×
405
                "pantsbuild.metrics-v0", json.dumps(metrics, sort_keys=True, cls=_Encoder)
406
            )
407

408
        otel_span.end(end_time=_datetime_to_otel_timestamp(workunit.end_time))
×
409

410
        del self._otel_spans[otel_span_id]
×
411
        self._span_count += 1
×
412

413
        # If this the root span, then log any vendor trace link as a side effect.
414
        if not workunit.primary_parent_id and self._trace_link_template:
×
415
            self._log_trace_link(
×
416
                root_span_id=otel_span_id,
417
                root_span_start_time=workunit.start_time,
418
                root_span_end_time=workunit.end_time,
419
            )
420

UNCOV
421
    def finish(
×
422
        self, timeout: datetime.timedelta | None = None, *, context: ProcessorContext
423
    ) -> None:
424
        if self._shutdown:
×
425
            raise RuntimeError("OTEL: finish called on shutdown processor")
×
426
        logger.debug("OpenTelemetryProcessor requested to finish workunit transmission.")
×
427
        logger.debug(f"OpenTelemetry processing counters: {self._counters.items()}")
×
428
        if len(self._otel_spans) > 0:
×
429
            logger.warning(
×
430
                "Multiple OpenTelemetry spans have not been submitted as completed to the library."
431
            )
432
        timeout_millis: int = int(timeout.total_seconds() * 1000.0) if timeout is not None else 2000
×
433
        self._span_processor.force_flush(timeout_millis)
×
434
        self._span_processor.shutdown()
×
435
        self._tracer_provider.shutdown()
×
436
        self._shutdown = True
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc