• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26260209689

21 May 2026 11:59PM UTC coverage: 75.453% (-15.7%) from 91.156%
26260209689

Pull #23365

github

web-flow
Merge 5fe873b58 into 7ea655ba0
Pull Request #23365: uv.lock -> pex optimization

5 of 16 new or added lines in 1 file covered. (31.25%)

10118 existing lines in 378 files now uncovered.

54669 of 72454 relevant lines covered (75.45%)

2.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/observability/opentelemetry/workunit_handler.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import datetime
×
UNCOV
7
from typing import Any, Mapping
×
8

UNCOV
9
from pants.backend.observability.opentelemetry.processor import (
×
10
    IncompleteWorkunit,
11
    Level,
12
    Processor,
13
    ProcessorContext,
14
    Workunit,
15
)
UNCOV
16
from pants.engine.internals.native_engine import all_counter_names
×
UNCOV
17
from pants.engine.internals.scheduler import Workunit as RawWorkunit
×
UNCOV
18
from pants.engine.streaming_workunit_handler import StreamingWorkunitContext, WorkunitsCallback
×
UNCOV
19
from pants.util.frozendict import FrozenDict
×
20

21

UNCOV
22
class _TelemetryContext(ProcessorContext):
×
UNCOV
23
    def __init__(self, pants_context: StreamingWorkunitContext) -> None:
×
24
        self._pants_context = pants_context
×
25

UNCOV
26
    def get_metrics(self) -> Mapping[str, int]:
×
27
        metric_names = all_counter_names()
×
28
        metrics = self._pants_context.get_metrics()
×
29
        for metric_name in metric_names:
×
30
            if metric_name not in metrics:
×
31
                metrics[metric_name] = 0
×
32
        return metrics
×
33

34

UNCOV
35
class TelemetryWorkunitsCallback(WorkunitsCallback):
×
UNCOV
36
    def __init__(
×
37
        self,
38
        processor: Processor,
39
        *,
40
        finish_timeout: datetime.timedelta,
41
        async_completion: bool,
42
    ) -> None:
UNCOV
43
        self.processor: Processor = processor
×
UNCOV
44
        self.finish_timeout = finish_timeout
×
UNCOV
45
        self.async_completion = async_completion
×
46

UNCOV
47
    @property
×
UNCOV
48
    def can_finish_async(self) -> bool:
×
49
        return self.async_completion
×
50

UNCOV
51
    def _convert_time(self, seconds: int, nanoseconds: int) -> datetime.datetime:
×
52
        t = datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.UTC)
×
53
        t = t + datetime.timedelta(seconds=seconds, microseconds=nanoseconds // 1000)
×
54
        return t
×
55

UNCOV
56
    def _convert_incomplete_workunit(self, raw_workunit: RawWorkunit) -> IncompleteWorkunit:
×
57
        return IncompleteWorkunit(
×
58
            name=raw_workunit["name"],
59
            span_id=raw_workunit["span_id"],
60
            parent_ids=tuple(raw_workunit["parent_ids"]),
61
            level=Level(raw_workunit["level"]),
62
            description=raw_workunit.get("description"),
63
            start_time=self._convert_time(raw_workunit["start_secs"], raw_workunit["start_nanos"]),
64
        )
65

UNCOV
66
    def _convert_completed_workunit(self, raw_workunit: RawWorkunit) -> Workunit:
×
67
        start_time = self._convert_time(raw_workunit["start_secs"], raw_workunit["start_nanos"])
×
68
        end_time = start_time + datetime.timedelta(
×
69
            seconds=raw_workunit["duration_secs"],
70
            microseconds=raw_workunit["duration_nanos"] // 1000,
71
        )
72
        return Workunit(
×
73
            name=raw_workunit["name"],
74
            span_id=raw_workunit["span_id"],
75
            parent_ids=tuple(raw_workunit["parent_ids"]),
76
            level=Level(raw_workunit["level"]),
77
            description=raw_workunit.get("description"),
78
            start_time=start_time,
79
            end_time=end_time,
80
            metadata=FrozenDict.deep_freeze(raw_workunit.get("metadata", {})),
81
        )
82

UNCOV
83
    def __call__(
×
84
        self,
85
        *,
86
        completed_workunits: tuple[RawWorkunit, ...],
87
        started_workunits: tuple[RawWorkunit, ...],
88
        context: StreamingWorkunitContext,
89
        finished: bool = False,
90
        **kwargs: Any,
91
    ) -> None:
92
        telemetry_context = _TelemetryContext(context)
×
93

94
        for started_workunit in started_workunits:
×
95
            self.processor.start_workunit(
×
96
                self._convert_incomplete_workunit(started_workunit), context=telemetry_context
97
            )
98

99
        for completed_workunit in completed_workunits:
×
100
            self.processor.complete_workunit(
×
101
                self._convert_completed_workunit(completed_workunit), context=telemetry_context
102
            )
103

104
        if finished:
×
105
            self.processor.finish(timeout=self.finish_timeout, context=telemetry_context)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc