• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20632486505

01 Jan 2026 04:21AM UTC coverage: 43.231% (-37.1%) from 80.281%
20632486505

Pull #22962

github

web-flow
Merge 08d5c63b0 into f52ab6675
Pull Request #22962: Bump the gha-deps group across 1 directory with 6 updates

26122 of 60424 relevant lines covered (43.23%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/goal/stats_aggregator.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
×
5

6
import base64
×
7
import datetime
×
8
import json
×
9
import logging
×
10
from collections import Counter
×
11
from dataclasses import dataclass
×
12
from enum import Enum
×
13
from pathlib import Path
×
14
from typing import TypedDict
×
15

16
from hdrh.histogram import HdrHistogram
×
17

18
from pants.engine.internals.scheduler import Workunit
×
19
from pants.engine.rules import collect_rules, rule
×
20
from pants.engine.streaming_workunit_handler import (
×
21
    StreamingWorkunitContext,
22
    WorkunitsCallback,
23
    WorkunitsCallbackFactory,
24
    WorkunitsCallbackFactoryRequest,
25
)
26
from pants.engine.unions import UnionRule
×
27
from pants.option.option_types import BoolOption, EnumOption, StrOption
×
28
from pants.option.subsystem import Subsystem
×
29
from pants.util.collections import deep_getsizeof
×
30
from pants.util.dirutil import safe_open
×
31
from pants.util.strutil import softwrap
×
32

33
logger = logging.getLogger(__name__)
×
34

35
HISTOGRAM_PERCENTILES = [25, 50, 75, 90, 95, 99]
×
36

37

38
class CounterObject(TypedDict):
×
39
    name: str
×
40
    count: int
×
41

42

43
class MemorySummaryObject(TypedDict):
×
44
    name: str
×
45
    count: int
×
46
    bytes: int
×
47

48

49
class ObservationHistogramObject(TypedDict):
×
50
    name: str
×
51
    min: int
×
52
    max: int
×
53
    mean: float
×
54
    std_dev: float
×
55
    total_observations: int
×
56
    sum: int
×
57

58

59
class StatsObject(TypedDict, total=False):
×
60
    timestamp: str
×
61
    command: str
×
62
    counters: list[CounterObject]
×
63
    memory_summary: list[MemorySummaryObject]
×
64
    observation_histograms: list[ObservationHistogramObject]
×
65

66

67
class StatsOutputFormat(Enum):
×
68
    """Output format for reporting Pants stats.
69

70
    text: Report stats in plain text.
71
    jsonlines: Report stats in JSON Lines text format.
72
    """
73

74
    text = "text"
×
75
    jsonlines = "jsonlines"
×
76

77

78
class StatsAggregatorSubsystem(Subsystem):
×
79
    options_scope = "stats"
×
80
    help = "An aggregator for Pants stats, such as cache metrics."
×
81

82
    log = BoolOption(
×
83
        default=False,
84
        help=softwrap(
85
            """
86
            At the end of the Pants run, log all counter metrics and summaries of
87
            observation histograms, e.g. the number of cache hits and the time saved by
88
            caching.
89

90
            For histogram summaries to work, you must add `hdrhistogram` to `[GLOBAL].plugins`.
91
            """
92
        ),
93
        advanced=True,
94
    )
95
    memory_summary = BoolOption(
×
96
        default=False,
97
        help=softwrap(
98
            """
99
            At the end of the Pants run, report a summary of memory usage.
100

101
            Keys are the total size in bytes, the count, and the name. Note that the total size
102
            is for all instances added together, so you can use total_size // count to get the
103
            average size.
104
            """
105
        ),
106
        advanced=True,
107
    )
108
    output_file = StrOption(
×
109
        default=None,
110
        metavar="<path>",
111
        help="Output the stats to this file. If unspecified, outputs to stdout.",
112
    )
113
    format = EnumOption(
×
114
        default=StatsOutputFormat.text,
115
        help="Output format for reporting stats.",
116
    )
117

118

119
def _log_or_write_to_file_plain(output_file: str | None, lines: list[str]) -> None:
×
120
    """Send text to the stdout or write to the output file (plain text)."""
121
    if lines:
×
122
        text = "\n".join(lines)
×
123
        if output_file:
×
124
            with safe_open(output_file, "a") as fh:
×
125
                fh.write(text)
×
126
            logger.info(f"Wrote Pants stats to {output_file}")
×
127
        else:
128
            logger.info(text)
×
129

130

131
def _log_or_write_to_file_json(output_file: str | None, stats_object: StatsObject) -> None:
×
132
    """Send JSON Lines single line object to the stdout or write to the file."""
133
    if not stats_object:
×
134
        return
×
135

136
    if not output_file:
×
137
        logger.info(json.dumps(stats_object))
×
138
        return
×
139

140
    jsonline = json.dumps(stats_object) + "\n"
×
141
    with safe_open(output_file, "a") as fh:
×
142
        fh.write(jsonline)
×
143
    logger.info(f"Wrote Pants stats to {output_file}")
×
144

145

146
class StatsAggregatorCallback(WorkunitsCallback):
×
147
    def __init__(
×
148
        self,
149
        *,
150
        log: bool,
151
        memory: bool,
152
        output_file: str | None,
153
        format: StatsOutputFormat,
154
    ) -> None:
155
        super().__init__()
×
156
        self.log = log
×
157
        self.memory = memory
×
158
        self.output_file = output_file
×
159
        self.format = format
×
160

161
    @property
×
162
    def can_finish_async(self) -> bool:
×
163
        # We need to finish synchronously for access to the console.
164
        return False
×
165

166
    def _output_stats_in_plain_text(self, context: StreamingWorkunitContext):
×
167
        output_lines = []
×
168
        if self.output_file:
×
169
            timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
×
170
            # have an empty line between stats of different Pants invocations
171
            space = "\n\n" if Path(self.output_file).exists() else ""
×
172
            output_lines.append(
×
173
                f"{space}{timestamp} Command: {context.run_tracker.run_information().get('cmd_line')}"
174
            )
175

176
        if self.log:
×
177
            # Capture global counters.
178
            counters = Counter(context.get_metrics())
×
179

180
            # Add any counters with a count of 0.
181
            for counter in context.run_tracker.counter_names:
×
182
                if counter not in counters:
×
183
                    counters[counter] = 0
×
184

185
            # Log aggregated counters.
186
            counter_lines = "\n".join(
×
187
                f"  {name}: {count}" for name, count in sorted(counters.items())
188
            )
189
            output_lines.append(f"Counters:\n{counter_lines}")
×
190

191
        if self.memory:
×
192
            ids: set[int] = set()
×
193
            count_by_type: Counter[type] = Counter()
×
194
            sizes_by_type: Counter[type] = Counter()
×
195

196
            items, rust_sizes = context._scheduler.live_items()
×
197
            for item in items:
×
198
                count_by_type[type(item)] += 1
×
199
                sizes_by_type[type(item)] += deep_getsizeof(item, ids)
×
200

201
            entries = [
×
202
                (size, count_by_type[typ], f"{typ.__module__}.{typ.__qualname__}")
203
                for typ, size in sizes_by_type.items()
204
            ]
205
            entries.extend(
×
206
                (size, count, f"(native) {name}") for name, (count, size) in rust_sizes.items()
207
            )
208
            memory_lines = "\n".join(
×
209
                f"  {size}\t\t{count}\t\t{name}" for size, count, name in sorted(entries)
210
            )
211
            output_lines.append(
×
212
                f"Memory summary (total size in bytes, count, name):\n{memory_lines}"
213
            )
214

215
        if not self.log:
×
216
            _log_or_write_to_file_plain(self.output_file, output_lines)
×
217
            return
×
218

219
        histograms = context.get_observation_histograms()["histograms"]
×
220
        if not histograms:
×
221
            output_lines.append("No observation histogram were recorded.")
×
222
            _log_or_write_to_file_plain(self.output_file, output_lines)
×
223
            return
×
224

225
        output_lines.append("Observation histogram summaries:")
×
226
        for name, encoded_histogram in histograms.items():
×
227
            # Note: The Python library for HDR Histogram will only decode compressed histograms
228
            # that are further encoded with base64. See
229
            # https://github.com/HdrHistogram/HdrHistogram_py/issues/29.
230
            histogram = HdrHistogram.decode(base64.b64encode(encoded_histogram))
×
231
            percentile_to_vals = "\n".join(
×
232
                f"  p{percentile}: {value}"
233
                for percentile, value in histogram.get_percentile_to_value_dict(
234
                    HISTOGRAM_PERCENTILES
235
                ).items()
236
            )
237
            output_lines.append(
×
238
                f"Summary of `{name}` observation histogram:\n"
239
                f"  min: {histogram.get_min_value()}\n"
240
                f"  max: {histogram.get_max_value()}\n"
241
                f"  mean: {histogram.get_mean_value():.3f}\n"
242
                f"  std dev: {histogram.get_stddev():.3f}\n"
243
                f"  total observations: {histogram.total_count}\n"
244
                f"  sum: {int(histogram.get_mean_value() * histogram.total_count)}\n"
245
                f"{percentile_to_vals}"
246
            )
247
        _log_or_write_to_file_plain(self.output_file, output_lines)
×
248

249
    def _output_stats_in_json(self, context: StreamingWorkunitContext):
×
250
        stats_object: StatsObject = {}
×
251

252
        if self.output_file:
×
253
            stats_object["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
×
254
            stats_object["command"] = context.run_tracker.run_information().get("cmd_line", "")
×
255

256
        if self.log:
×
257
            # Capture global counters.
258
            counters = Counter(context.get_metrics())
×
259

260
            # Add any counters with a count of 0.
261
            for counter in context.run_tracker.counter_names:
×
262
                if counter not in counters:
×
263
                    counters[counter] = 0
×
264

265
            # Log aggregated counters.
266
            stats_object["counters"] = [
×
267
                {"name": name, "count": count} for name, count in sorted(counters.items())
268
            ]
269

270
        if self.memory:
×
271
            ids: set[int] = set()
×
272
            count_by_type: Counter[type] = Counter()
×
273
            sizes_by_type: Counter[type] = Counter()
×
274

275
            items, rust_sizes = context._scheduler.live_items()
×
276
            for item in items:
×
277
                count_by_type[type(item)] += 1
×
278
                sizes_by_type[type(item)] += deep_getsizeof(item, ids)
×
279

280
            entries = [
×
281
                (size, count_by_type[typ], f"{typ.__module__}.{typ.__qualname__}")
282
                for typ, size in sizes_by_type.items()
283
            ]
284
            entries.extend(
×
285
                (size, count, f"(native) {name}") for name, (count, size) in rust_sizes.items()
286
            )
287
            memory_lines: list[MemorySummaryObject] = [
×
288
                {"bytes": size, "count": count, "name": name}
289
                for size, count, name in sorted(entries)
290
            ]
291
            stats_object["memory_summary"] = memory_lines
×
292

293
        if not self.log:
×
294
            _log_or_write_to_file_json(self.output_file, stats_object)
×
295
            return
×
296

297
        histograms = context.get_observation_histograms()["histograms"]
×
298
        if not histograms:
×
299
            stats_object["observation_histograms"] = []
×
300
            _log_or_write_to_file_json(self.output_file, stats_object)
×
301
            return
×
302

303
        observation_histograms: list[ObservationHistogramObject] = []
×
304
        for name, encoded_histogram in histograms.items():
×
305
            # Note: The Python library for HDR Histogram will only decode compressed histograms
306
            # that are further encoded with base64. See
307
            # https://github.com/HdrHistogram/HdrHistogram_py/issues/29.
308
            histogram = HdrHistogram.decode(base64.b64encode(encoded_histogram))
×
309
            percentile_to_vals = {
×
310
                f"p{percentile}": value
311
                for percentile, value in histogram.get_percentile_to_value_dict(
312
                    HISTOGRAM_PERCENTILES
313
                ).items()
314
            }
315

316
            observation_histogram: ObservationHistogramObject = {
×
317
                "name": name,
318
                "min": histogram.get_min_value(),
319
                "max": histogram.get_max_value(),
320
                "mean": round(histogram.get_mean_value(), 3),
321
                "std_dev": round(histogram.get_stddev(), 3),
322
                "total_observations": histogram.total_count,
323
                "sum": int(histogram.get_mean_value() * histogram.total_count),
324
                **percentile_to_vals,  # type: ignore
325
            }
326
            observation_histograms.append(observation_histogram)
×
327
        stats_object["observation_histograms"] = observation_histograms
×
328

329
        _log_or_write_to_file_json(self.output_file, stats_object)
×
330

331
    def __call__(
×
332
        self,
333
        *,
334
        started_workunits: tuple[Workunit, ...],
335
        completed_workunits: tuple[Workunit, ...],
336
        finished: bool,
337
        context: StreamingWorkunitContext,
338
    ) -> None:
339
        if not finished:
×
340
            return
×
341

342
        if StatsOutputFormat.text == self.format:
×
343
            self._output_stats_in_plain_text(context)
×
344
        elif StatsOutputFormat.jsonlines == self.format:
×
345
            self._output_stats_in_json(context)
×
346

347

348
@dataclass(frozen=True)
×
349
class StatsAggregatorCallbackFactoryRequest:
×
350
    """A unique request type that is installed to trigger construction of the WorkunitsCallback."""
351

352

353
@rule
×
354
async def construct_callback(
×
355
    _: StatsAggregatorCallbackFactoryRequest, subsystem: StatsAggregatorSubsystem
356
) -> WorkunitsCallbackFactory:
357
    return WorkunitsCallbackFactory(
×
358
        lambda: (
359
            StatsAggregatorCallback(
360
                log=subsystem.log,
361
                memory=subsystem.memory_summary,
362
                output_file=subsystem.output_file,
363
                format=subsystem.format,
364
            )
365
            if subsystem.log or subsystem.memory_summary
366
            else None
367
        )
368
    )
369

370

371
def rules():
×
372
    return [
×
373
        UnionRule(WorkunitsCallbackFactoryRequest, StatsAggregatorCallbackFactoryRequest),
374
        *collect_rules(),
375
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc