• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.37
/src/python/pants/util/collections.py
1
# Copyright 2017 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import collections
1✔
7
import collections.abc
1✔
8
import gc
1✔
9
import math
1✔
10
from collections.abc import Callable, Iterable, Iterator, MutableMapping
1✔
11
from sys import getsizeof
1✔
12
from typing import Any, TypeVar
1✔
13

14
from pants.engine.internals import native_engine
1✔
15
from pants.util.strutil import softwrap
1✔
16

17

18
def recursively_update(d: MutableMapping, d2: MutableMapping) -> None:
1✔
19
    """dict.update but which merges child dicts (dict2 takes precedence where there's conflict)."""
UNCOV
20
    for k, v in d2.items():
×
UNCOV
21
        if k in d:
×
UNCOV
22
            if isinstance(v, dict):
×
UNCOV
23
                recursively_update(d[k], v)
×
UNCOV
24
                continue
×
UNCOV
25
        d[k] = v
×
26

27

28
def deep_getsizeof(o: Any, ids: set[int]) -> int:
1✔
29
    """Find the memory footprint of the given object.
30

31
    To avoid double-counting, `ids` should be a set of object ids which have been visited by
32
    previous calls to this method.
33
    """
34
    if id(o) in ids:
×
35
        return 0
×
36

37
    d = deep_getsizeof
×
38
    r = getsizeof(o)
×
39
    ids.add(id(o))
×
40

41
    return r + sum(d(x, ids) for x in gc.get_referents())
×
42

43

44
_T = TypeVar("_T")
1✔
45

46

47
def assert_single_element(iterable: Iterable[_T]) -> _T:
1✔
48
    """Get the single element of `iterable`, or raise an error.
49

50
    :raise: :class:`StopIteration` if there is no element.
51
    :raise: :class:`ValueError` if there is more than one element.
52
    """
UNCOV
53
    it = iter(iterable)
×
UNCOV
54
    first_item = next(it)
×
55

UNCOV
56
    try:
×
UNCOV
57
        next(it)
×
UNCOV
58
    except StopIteration:
×
UNCOV
59
        return first_item
×
60

UNCOV
61
    raise ValueError(f"iterable {iterable!r} has more than one element.")
×
62

63

64
def ensure_list(
1✔
65
    val: Any | Iterable[Any], *, expected_type: type[_T], allow_single_scalar: bool = False
66
) -> list[_T]:
67
    """Ensure that every element of an iterable is the expected type and convert the result to a
68
    list.
69

70
    If `allow_single_scalar` is True, a single value T will be wrapped into a `List[T]`.
71
    """
UNCOV
72
    if isinstance(val, expected_type):
×
UNCOV
73
        if not allow_single_scalar:
×
UNCOV
74
            raise ValueError(f"The value {val} must be wrapped in an iterable (e.g. a list).")
×
UNCOV
75
        return [val]
×
UNCOV
76
    if not isinstance(val, collections.abc.Iterable):
×
UNCOV
77
        raise ValueError(
×
78
            f"The value {val} (type {type(val)}) was not an iterable of {expected_type}."
79
        )
UNCOV
80
    result: list[_T] = []
×
UNCOV
81
    for i, x in enumerate(val):
×
UNCOV
82
        if not isinstance(x, expected_type):
×
UNCOV
83
            raise ValueError(
×
84
                softwrap(
85
                    f"""
86
                    Not all elements of the iterable have type {expected_type}. Encountered the
87
                    element {x} of type {type(x)} at index {i}.
88
                    """
89
                )
90
            )
UNCOV
91
        result.append(x)
×
UNCOV
92
    return result
×
93

94

95
def ensure_str_list(val: str | Iterable[str], *, allow_single_str: bool = False) -> list[str]:
1✔
96
    """Ensure that every element of an iterable is a string and convert the result to a list.
97

98
    If `allow_single_str` is True, a single `str` will be wrapped into a `List[str]`.
99
    """
UNCOV
100
    return ensure_list(val, expected_type=str, allow_single_scalar=allow_single_str)
×
101

102

103
def partition_sequentially(
1✔
104
    items: Iterable[_T],
105
    *,
106
    key: Callable[[_T], str],
107
    size_target: int,
108
    size_max: int | None = None,
109
) -> Iterator[list[_T]]:
110
    """Stably partitions the given items into batches of around `size_target` items.
111

112
    The "stability" property refers to avoiding adjusting all batches when a single item is added,
113
    which could happen if the items were trivially windowed using `itertools.islice` and an
114
    item was added near the front of the list.
115

116
    Batches will optionally be capped to `size_max`, but note that this can weaken the stability
117
    properties of the bucketing, by forcing bucket boundaries to be created where they otherwise
118
    might not.
119
    """
120

121
    # To stably partition the arguments into ranges of approximately `size_target`, we sort them,
122
    # and create a new batch sequentially once we encounter an item hash prefixed with a threshold
123
    # of zeros.
124
    #
125
    # The hashes act like a (deterministic) series of rolls of an evenly distributed die. The
126
    # probability of a hash prefixed with Z zero bits is 1/2^Z, and so to break after N items on
127
    # average, we look for `Z == log2(N)` zero bits.
128
    #
129
    # Breaking on these deterministic boundaries reduces the chance that adding or removing items
130
    # causes multiple buckets to be recalculated. But when a `size_max` value is set, it's possible
131
    # for adding items to cause multiple sequential buckets to be affected.
UNCOV
132
    zero_prefix_threshold = math.log(max(1, size_target), 2)
×
133

UNCOV
134
    batch: list[_T] = []
×
135

UNCOV
136
    def emit_batch() -> list[_T]:
×
UNCOV
137
        assert batch
×
UNCOV
138
        result = list(batch)
×
UNCOV
139
        batch.clear()
×
UNCOV
140
        return result
×
141

UNCOV
142
    keyed_items = []
×
UNCOV
143
    for item in items:
×
UNCOV
144
        keyed_items.append((key(item), item))
×
UNCOV
145
    keyed_items.sort()
×
146

UNCOV
147
    for item_key, item in keyed_items:
×
UNCOV
148
        batch.append(item)
×
UNCOV
149
        prefix_zero_bits = native_engine.hash_prefix_zero_bits(item_key)
×
UNCOV
150
        if prefix_zero_bits >= zero_prefix_threshold or (size_max and len(batch) >= size_max):
×
UNCOV
151
            yield emit_batch()
×
UNCOV
152
    if batch:
×
UNCOV
153
        yield emit_batch()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc