• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18252174847

05 Oct 2025 01:36AM UTC coverage: 43.382% (-36.9%) from 80.261%
18252174847

push

github

web-flow
run tests on mac arm (#22717)

Just doing the minimal to pull forward the x86_64 pattern.

ref #20993

25776 of 59416 relevant lines covered (43.38%)

1.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.27
/src/python/pants/util/collections.py
1
# Copyright 2017 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
3✔
5

6
import collections
3✔
7
import collections.abc
3✔
8
import gc
3✔
9
import math
3✔
10
from collections.abc import Callable, Iterable, Iterator, MutableMapping
3✔
11
from sys import getsizeof
3✔
12
from typing import Any, TypeVar
3✔
13

14
from pants.engine.internals import native_engine
3✔
15
from pants.util.strutil import softwrap
3✔
16

17

18
def recursively_update(d: MutableMapping, d2: MutableMapping) -> None:
3✔
19
    """dict.update but which merges child dicts (dict2 takes precedence where there's conflict)."""
20
    for k, v in d2.items():
3✔
21
        if k in d:
×
22
            if isinstance(v, dict):
×
23
                recursively_update(d[k], v)
×
24
                continue
×
25
        d[k] = v
×
26

27

28
def deep_getsizeof(o: Any, ids: set[int]) -> int:
3✔
29
    """Find the memory footprint of the given object.
30

31
    To avoid double-counting, `ids` should be a set of object ids which have been visited by
32
    previous calls to this method.
33
    """
34
    if id(o) in ids:
×
35
        return 0
×
36

37
    d = deep_getsizeof
×
38
    r = getsizeof(o)
×
39
    ids.add(id(o))
×
40

41
    return r + sum(d(x, ids) for x in gc.get_referents())
×
42

43

44
_T = TypeVar("_T")
3✔
45

46

47
def assert_single_element(iterable: Iterable[_T]) -> _T:
3✔
48
    """Get the single element of `iterable`, or raise an error.
49

50
    :raise: :class:`StopIteration` if there is no element.
51
    :raise: :class:`ValueError` if there is more than one element.
52
    """
53
    it = iter(iterable)
3✔
54
    first_item = next(it)
3✔
55

56
    try:
3✔
57
        next(it)
3✔
58
    except StopIteration:
3✔
59
        return first_item
3✔
60

61
    raise ValueError(f"iterable {iterable!r} has more than one element.")
×
62

63

64
def ensure_list(
3✔
65
    val: Any | Iterable[Any], *, expected_type: type[_T], allow_single_scalar: bool = False
66
) -> list[_T]:
67
    """Ensure that every element of an iterable is the expected type and convert the result to a
68
    list.
69

70
    If `allow_single_scalar` is True, a single value T will be wrapped into a `List[T]`.
71
    """
72
    if isinstance(val, expected_type):
3✔
73
        if not allow_single_scalar:
×
74
            raise ValueError(f"The value {val} must be wrapped in an iterable (e.g. a list).")
×
75
        return [val]
×
76
    if not isinstance(val, collections.abc.Iterable):
3✔
77
        raise ValueError(
×
78
            f"The value {val} (type {type(val)}) was not an iterable of {expected_type}."
79
        )
80
    result: list[_T] = []
3✔
81
    for i, x in enumerate(val):
3✔
82
        if not isinstance(x, expected_type):
3✔
83
            raise ValueError(
×
84
                softwrap(
85
                    f"""
86
                    Not all elements of the iterable have type {expected_type}. Encountered the
87
                    element {x} of type {type(x)} at index {i}.
88
                    """
89
                )
90
            )
91
        result.append(x)
3✔
92
    return result
3✔
93

94

95
def ensure_str_list(val: str | Iterable[str], *, allow_single_str: bool = False) -> list[str]:
3✔
96
    """Ensure that every element of an iterable is a string and convert the result to a list.
97

98
    If `allow_single_str` is True, a single `str` will be wrapped into a `List[str]`.
99
    """
100
    return ensure_list(val, expected_type=str, allow_single_scalar=allow_single_str)
×
101

102

103
def partition_sequentially(
3✔
104
    items: Iterable[_T],
105
    *,
106
    key: Callable[[_T], str],
107
    size_target: int,
108
    size_max: int | None = None,
109
) -> Iterator[list[_T]]:
110
    """Stably partitions the given items into batches of around `size_target` items.
111

112
    The "stability" property refers to avoiding adjusting all batches when a single item is added,
113
    which could happen if the items were trivially windowed using `itertools.islice` and an
114
    item was added near the front of the list.
115

116
    Batches will optionally be capped to `size_max`, but note that this can weaken the stability
117
    properties of the bucketing, by forcing bucket boundaries to be created where they otherwise
118
    might not.
119
    """
120

121
    # To stably partition the arguments into ranges of approximately `size_target`, we sort them,
122
    # and create a new batch sequentially once we encounter an item hash prefixed with a threshold
123
    # of zeros.
124
    #
125
    # The hashes act like a (deterministic) series of rolls of an evenly distributed die. The
126
    # probability of a hash prefixed with Z zero bits is 1/2^Z, and so to break after N items on
127
    # average, we look for `Z == log2(N)` zero bits.
128
    #
129
    # Breaking on these deterministic boundaries reduces the chance that adding or removing items
130
    # causes multiple buckets to be recalculated. But when a `size_max` value is set, it's possible
131
    # for adding items to cause multiple sequential buckets to be affected.
132
    zero_prefix_threshold = math.log(max(1, size_target), 2)
×
133

134
    batch: list[_T] = []
×
135

136
    def emit_batch() -> list[_T]:
×
137
        assert batch
×
138
        result = list(batch)
×
139
        batch.clear()
×
140
        return result
×
141

142
    keyed_items = []
×
143
    for item in items:
×
144
        keyed_items.append((key(item), item))
×
145
    keyed_items.sort()
×
146

147
    for item_key, item in keyed_items:
×
148
        batch.append(item)
×
149
        prefix_zero_bits = native_engine.hash_prefix_zero_bits(item_key)
×
150
        if prefix_zero_bits >= zero_prefix_threshold or (size_max and len(batch) >= size_max):
×
151
            yield emit_batch()
×
152
    if batch:
×
153
        yield emit_batch()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc