• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/goal/stats_aggregator.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import base64
×
UNCOV
7
import datetime
×
UNCOV
8
import json
×
UNCOV
9
import logging
×
UNCOV
10
from collections import Counter
×
UNCOV
11
from dataclasses import dataclass
×
UNCOV
12
from enum import Enum
×
UNCOV
13
from pathlib import Path
×
UNCOV
14
from typing import TypedDict
×
15

UNCOV
16
from hdrh.histogram import HdrHistogram
×
17

UNCOV
18
from pants.engine.internals.scheduler import Workunit
×
UNCOV
19
from pants.engine.rules import collect_rules, rule
×
UNCOV
20
from pants.engine.streaming_workunit_handler import (
×
21
    StreamingWorkunitContext,
22
    WorkunitsCallback,
23
    WorkunitsCallbackFactory,
24
    WorkunitsCallbackFactoryRequest,
25
)
UNCOV
26
from pants.engine.unions import UnionRule
×
UNCOV
27
from pants.option.option_types import BoolOption, EnumOption, StrOption
×
UNCOV
28
from pants.option.subsystem import Subsystem
×
UNCOV
29
from pants.util.collections import deep_getsizeof
×
UNCOV
30
from pants.util.dirutil import safe_open
×
UNCOV
31
from pants.util.strutil import softwrap
×
32

UNCOV
33
logger = logging.getLogger(__name__)
×
34

UNCOV
35
HISTOGRAM_PERCENTILES = [25, 50, 75, 90, 95, 99]
×
36

37

UNCOV
38
class CounterObject(TypedDict):
×
UNCOV
39
    name: str
×
UNCOV
40
    count: int
×
41

42

UNCOV
43
class MemorySummaryObject(TypedDict):
×
UNCOV
44
    name: str
×
UNCOV
45
    count: int
×
UNCOV
46
    bytes: int
×
47

48

UNCOV
49
class ObservationHistogramObject(TypedDict):
×
UNCOV
50
    name: str
×
UNCOV
51
    min: int
×
UNCOV
52
    max: int
×
UNCOV
53
    mean: float
×
UNCOV
54
    std_dev: float
×
UNCOV
55
    total_observations: int
×
UNCOV
56
    sum: int
×
57

58

UNCOV
59
class StatsObject(TypedDict, total=False):
×
UNCOV
60
    timestamp: str
×
UNCOV
61
    command: str
×
UNCOV
62
    counters: list[CounterObject]
×
UNCOV
63
    memory_summary: list[MemorySummaryObject]
×
UNCOV
64
    observation_histograms: list[ObservationHistogramObject]
×
65

66

UNCOV
67
class StatsOutputFormat(Enum):
×
68
    """Output format for reporting Pants stats.
69

70
    text: Report stats in plain text.
71
    jsonlines: Report stats in JSON Lines text format.
72
    """
73

UNCOV
74
    text = "text"
×
UNCOV
75
    jsonlines = "jsonlines"
×
76

77

UNCOV
78
class StatsAggregatorSubsystem(Subsystem):
×
UNCOV
79
    options_scope = "stats"
×
UNCOV
80
    help = "An aggregator for Pants stats, such as cache metrics."
×
81

UNCOV
82
    log = BoolOption(
×
83
        default=False,
84
        help=softwrap(
85
            """
86
            At the end of the Pants run, log all counter metrics and summaries of
87
            observation histograms, e.g. the number of cache hits and the time saved by
88
            caching.
89

90
            For histogram summaries to work, you must add `hdrhistogram` to `[GLOBAL].plugins`.
91
            """
92
        ),
93
        advanced=True,
94
    )
UNCOV
95
    memory_summary = BoolOption(
×
96
        default=False,
97
        help=softwrap(
98
            """
99
            At the end of the Pants run, report a summary of memory usage.
100

101
            Keys are the total size in bytes, the count, and the name. Note that the total size
102
            is for all instances added together, so you can use total_size // count to get the
103
            average size.
104
            """
105
        ),
106
        advanced=True,
107
    )
UNCOV
108
    output_file = StrOption(
×
109
        default=None,
110
        metavar="<path>",
111
        help="Output the stats to this file. If unspecified, outputs to stdout.",
112
    )
UNCOV
113
    format = EnumOption(
×
114
        default=StatsOutputFormat.text,
115
        help="Output format for reporting stats.",
116
    )
117

118

UNCOV
119
def _log_or_write_to_file_plain(output_file: str | None, lines: list[str]) -> None:
×
120
    """Send text to the stdout or write to the output file (plain text)."""
121
    if lines:
×
122
        text = "\n".join(lines)
×
123
        if output_file:
×
124
            with safe_open(output_file, "a") as fh:
×
125
                fh.write(text)
×
126
            logger.info(f"Wrote Pants stats to {output_file}")
×
127
        else:
128
            logger.info(text)
×
129

130

UNCOV
131
def _log_or_write_to_file_json(output_file: str | None, stats_object: StatsObject) -> None:
×
132
    """Send JSON Lines single line object to the stdout or write to the file."""
133
    if not stats_object:
×
134
        return
×
135

136
    if not output_file:
×
137
        logger.info(json.dumps(stats_object))
×
138
        return
×
139

140
    jsonline = json.dumps(stats_object) + "\n"
×
141
    with safe_open(output_file, "a") as fh:
×
142
        fh.write(jsonline)
×
143
    logger.info(f"Wrote Pants stats to {output_file}")
×
144

145

UNCOV
146
class StatsAggregatorCallback(WorkunitsCallback):
×
UNCOV
147
    def __init__(
×
148
        self,
149
        *,
150
        log: bool,
151
        memory: bool,
152
        output_file: str | None,
153
        format: StatsOutputFormat,
154
    ) -> None:
155
        super().__init__()
×
156
        self.log = log
×
157
        self.memory = memory
×
158
        self.output_file = output_file
×
159
        self.format = format
×
160

UNCOV
161
    @property
×
UNCOV
162
    def can_finish_async(self) -> bool:
×
163
        # We need to finish synchronously for access to the console.
164
        return False
×
165

UNCOV
166
    def _output_stats_in_plain_text(self, context: StreamingWorkunitContext):
×
167
        output_lines = []
×
168
        if self.output_file:
×
169
            timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
×
170
            # have an empty line between stats of different Pants invocations
171
            space = "\n\n" if Path(self.output_file).exists() else ""
×
172
            output_lines.append(
×
173
                f"{space}{timestamp} Command: {context.run_tracker.run_information().get('cmd_line')}"
174
            )
175

176
        if self.log:
×
177
            # Capture global counters.
178
            counters = Counter(context.get_metrics())
×
179

180
            # Add any counters with a count of 0.
181
            for counter in context.run_tracker.counter_names:
×
182
                if counter not in counters:
×
183
                    counters[counter] = 0
×
184

185
            # Log aggregated counters.
186
            counter_lines = "\n".join(
×
187
                f"  {name}: {count}" for name, count in sorted(counters.items())
188
            )
189
            output_lines.append(f"Counters:\n{counter_lines}")
×
190

191
        if self.memory:
×
192
            ids: set[int] = set()
×
193
            count_by_type: Counter[type] = Counter()
×
194
            sizes_by_type: Counter[type] = Counter()
×
195

196
            items, rust_sizes = context._scheduler.live_items()
×
197
            for item in items:
×
198
                count_by_type[type(item)] += 1
×
199
                sizes_by_type[type(item)] += deep_getsizeof(item, ids)
×
200

201
            entries = [
×
202
                (size, count_by_type[typ], f"{typ.__module__}.{typ.__qualname__}")
203
                for typ, size in sizes_by_type.items()
204
            ]
205
            entries.extend(
×
206
                (size, count, f"(native) {name}") for name, (count, size) in rust_sizes.items()
207
            )
208
            memory_lines = "\n".join(
×
209
                f"  {size}\t\t{count}\t\t{name}" for size, count, name in sorted(entries)
210
            )
211
            output_lines.append(
×
212
                f"Memory summary (total size in bytes, count, name):\n{memory_lines}"
213
            )
214

215
        if not self.log:
×
216
            _log_or_write_to_file_plain(self.output_file, output_lines)
×
217
            return
×
218

219
        histograms = context.get_observation_histograms()["histograms"]
×
220
        if not histograms:
×
221
            output_lines.append("No observation histogram were recorded.")
×
222
            _log_or_write_to_file_plain(self.output_file, output_lines)
×
223
            return
×
224

225
        output_lines.append("Observation histogram summaries:")
×
226
        for name, encoded_histogram in histograms.items():
×
227
            # Note: The Python library for HDR Histogram will only decode compressed histograms
228
            # that are further encoded with base64. See
229
            # https://github.com/HdrHistogram/HdrHistogram_py/issues/29.
230
            histogram = HdrHistogram.decode(base64.b64encode(encoded_histogram))
×
231
            percentile_to_vals = "\n".join(
×
232
                f"  p{percentile}: {value}"
233
                for percentile, value in histogram.get_percentile_to_value_dict(
234
                    HISTOGRAM_PERCENTILES
235
                ).items()
236
            )
237
            output_lines.append(
×
238
                f"Summary of `{name}` observation histogram:\n"
239
                f"  min: {histogram.get_min_value()}\n"
240
                f"  max: {histogram.get_max_value()}\n"
241
                f"  mean: {histogram.get_mean_value():.3f}\n"
242
                f"  std dev: {histogram.get_stddev():.3f}\n"
243
                f"  total observations: {histogram.total_count}\n"
244
                f"  sum: {int(histogram.get_mean_value() * histogram.total_count)}\n"
245
                f"{percentile_to_vals}"
246
            )
247
        _log_or_write_to_file_plain(self.output_file, output_lines)
×
248

UNCOV
249
    def _output_stats_in_json(self, context: StreamingWorkunitContext):
×
250
        stats_object: StatsObject = {}
×
251

252
        if self.output_file:
×
253
            stats_object["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
×
254
            stats_object["command"] = context.run_tracker.run_information().get("cmd_line", "")
×
255

256
        if self.log:
×
257
            # Capture global counters.
258
            counters = Counter(context.get_metrics())
×
259

260
            # Add any counters with a count of 0.
261
            for counter in context.run_tracker.counter_names:
×
262
                if counter not in counters:
×
263
                    counters[counter] = 0
×
264

265
            # Log aggregated counters.
266
            stats_object["counters"] = [
×
267
                {"name": name, "count": count} for name, count in sorted(counters.items())
268
            ]
269

270
        if self.memory:
×
271
            ids: set[int] = set()
×
272
            count_by_type: Counter[type] = Counter()
×
273
            sizes_by_type: Counter[type] = Counter()
×
274

275
            items, rust_sizes = context._scheduler.live_items()
×
276
            for item in items:
×
277
                count_by_type[type(item)] += 1
×
278
                sizes_by_type[type(item)] += deep_getsizeof(item, ids)
×
279

280
            entries = [
×
281
                (size, count_by_type[typ], f"{typ.__module__}.{typ.__qualname__}")
282
                for typ, size in sizes_by_type.items()
283
            ]
284
            entries.extend(
×
285
                (size, count, f"(native) {name}") for name, (count, size) in rust_sizes.items()
286
            )
287
            memory_lines: list[MemorySummaryObject] = [
×
288
                {"bytes": size, "count": count, "name": name}
289
                for size, count, name in sorted(entries)
290
            ]
291
            stats_object["memory_summary"] = memory_lines
×
292

293
        if not self.log:
×
294
            _log_or_write_to_file_json(self.output_file, stats_object)
×
295
            return
×
296

297
        histograms = context.get_observation_histograms()["histograms"]
×
298
        if not histograms:
×
299
            stats_object["observation_histograms"] = []
×
300
            _log_or_write_to_file_json(self.output_file, stats_object)
×
301
            return
×
302

303
        observation_histograms: list[ObservationHistogramObject] = []
×
304
        for name, encoded_histogram in histograms.items():
×
305
            # Note: The Python library for HDR Histogram will only decode compressed histograms
306
            # that are further encoded with base64. See
307
            # https://github.com/HdrHistogram/HdrHistogram_py/issues/29.
308
            histogram = HdrHistogram.decode(base64.b64encode(encoded_histogram))
×
309
            percentile_to_vals = {
×
310
                f"p{percentile}": value
311
                for percentile, value in histogram.get_percentile_to_value_dict(
312
                    HISTOGRAM_PERCENTILES
313
                ).items()
314
            }
315

316
            observation_histogram: ObservationHistogramObject = {
×
317
                "name": name,
318
                "min": histogram.get_min_value(),
319
                "max": histogram.get_max_value(),
320
                "mean": round(histogram.get_mean_value(), 3),
321
                "std_dev": round(histogram.get_stddev(), 3),
322
                "total_observations": histogram.total_count,
323
                "sum": int(histogram.get_mean_value() * histogram.total_count),
324
                **percentile_to_vals,  # type: ignore
325
            }
326
            observation_histograms.append(observation_histogram)
×
327
        stats_object["observation_histograms"] = observation_histograms
×
328

329
        _log_or_write_to_file_json(self.output_file, stats_object)
×
330

UNCOV
331
    def __call__(
×
332
        self,
333
        *,
334
        started_workunits: tuple[Workunit, ...],
335
        completed_workunits: tuple[Workunit, ...],
336
        finished: bool,
337
        context: StreamingWorkunitContext,
338
    ) -> None:
339
        if not finished:
×
340
            return
×
341

342
        if StatsOutputFormat.text == self.format:
×
343
            self._output_stats_in_plain_text(context)
×
344
        elif StatsOutputFormat.jsonlines == self.format:
×
345
            self._output_stats_in_json(context)
×
346

347

UNCOV
348
@dataclass(frozen=True)
×
UNCOV
349
class StatsAggregatorCallbackFactoryRequest:
×
350
    """A unique request type that is installed to trigger construction of the WorkunitsCallback."""
351

352

UNCOV
353
@rule
×
UNCOV
354
async def construct_callback(
×
355
    _: StatsAggregatorCallbackFactoryRequest, subsystem: StatsAggregatorSubsystem
356
) -> WorkunitsCallbackFactory:
357
    return WorkunitsCallbackFactory(
×
358
        lambda: (
359
            StatsAggregatorCallback(
360
                log=subsystem.log,
361
                memory=subsystem.memory_summary,
362
                output_file=subsystem.output_file,
363
                format=subsystem.format,
364
            )
365
            if subsystem.log or subsystem.memory_summary
366
            else None
367
        )
368
    )
369

370

UNCOV
371
def rules():
×
UNCOV
372
    return [
×
373
        UnionRule(WorkunitsCallbackFactoryRequest, StatsAggregatorCallbackFactoryRequest),
374
        *collect_rules(),
375
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc