• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 24142259555

08 Apr 2026 02:58PM UTC coverage: 91.259% (-1.7%) from 92.909%
24142259555

Pull #23228

github

web-flow
Merge 1e468fa49 into 9036734c9
Pull Request #23228: Add persistent dependency inference cache for incremental --changed-dependents

266 of 317 new or added lines in 3 files covered. (83.91%)

1448 existing lines in 70 files now uncovered.

86176 of 94430 relevant lines covered (91.26%)

3.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.7
/src/python/pants/backend/project_info/dependents.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
import json
10✔
4
import logging
10✔
5
import os
10✔
6
import time
10✔
7
from collections import defaultdict
10✔
8
from collections.abc import Iterable
10✔
9
from dataclasses import dataclass
10✔
10
from enum import Enum
10✔
11

12
from pants.backend.project_info.incremental_dependents import (
10✔
13
    CachedEntry,
14
    compute_source_fingerprint,
15
    get_cache_path,
16
    load_persisted_graph,
17
    save_persisted_graph,
18
)
19
from pants.base.build_environment import get_buildroot
10✔
20
from pants.engine.addresses import Address, Addresses
10✔
21
from pants.engine.collection import DeduplicatedCollection
10✔
22
from pants.engine.console import Console
10✔
23
from pants.engine.goal import Goal, GoalSubsystem, LineOriented
10✔
24
from pants.engine.internals.graph import resolve_dependencies
10✔
25
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
10✔
26
from pants.engine.target import (
10✔
27
    AllUnexpandedTargets,
28
    AlwaysTraverseDeps,
29
    Dependencies,
30
    DependenciesRequest,
31
)
32
from pants.option.option_types import BoolOption, EnumOption
10✔
33
from pants.util.frozendict import FrozenDict
10✔
34
from pants.util.logging import LogLevel
10✔
35
from pants.util.ordered_set import FrozenOrderedSet
10✔
36

37
logger = logging.getLogger(__name__)
10✔
38

39

40
@dataclass(frozen=True)
10✔
41
class AddressToDependents:
10✔
42
    mapping: FrozenDict[Address, FrozenOrderedSet[Address]]
43

44

45
class DependentsOutputFormat(Enum):
10✔
46
    """Output format for listing dependents.
47

48
    text: List all dependents as a single list of targets in plain text.
49
    json: List all dependents as a mapping `{target: [dependents]}`.
50
    """
51

52
    text = "text"
10✔
53
    json = "json"
10✔
54

55

56
@rule(desc="Map all targets to their dependents", level=LogLevel.DEBUG)
10✔
57
async def map_addresses_to_dependents(
10✔
58
    all_targets: AllUnexpandedTargets,
59
) -> AddressToDependents:
60
    """Build a reverse dependency map (target -> set of its dependents).
61

62
    When incremental mode is enabled via the PANTS_INCREMENTAL_DEPENDENTS environment
63
    variable, the forward dependency graph is persisted to disk. On subsequent runs,
64
    only targets whose source files have changed need their dependencies re-resolved,
65
    dramatically reducing wall time for large repos.
66
    """
67
    if not os.environ.get("PANTS_INCREMENTAL_DEPENDENTS"):
2✔
68
        # Original behavior: resolve all dependencies from scratch.
69
        dependencies_per_target = await concurrently(
2✔
70
            resolve_dependencies(
71
                DependenciesRequest(
72
                    tgt.get(Dependencies),
73
                    should_traverse_deps_predicate=AlwaysTraverseDeps(),
74
                ),
75
                **implicitly(),
76
            )
77
            for tgt in all_targets
78
        )
79

80
        address_to_dependents = defaultdict(set)
2✔
81
        for tgt, dependencies in zip(all_targets, dependencies_per_target):
2✔
82
            for dependency in dependencies:
2✔
83
                address_to_dependents[dependency].add(tgt.address)
2✔
84
        return AddressToDependents(
2✔
85
            FrozenDict(
86
                {
87
                    addr: FrozenOrderedSet(dependents)
88
                    for addr, dependents in address_to_dependents.items()
89
                }
90
            )
91
        )
92

93
    # --- Incremental mode ---
NEW
94
    start_time = time.time()
×
NEW
95
    buildroot = get_buildroot()
×
NEW
96
    cache_path = get_cache_path()
×
97

98
    # Step 1: Load previous graph
NEW
99
    previous = load_persisted_graph(cache_path, buildroot)
×
NEW
100
    logger.warning(
×
101
        "Incremental dep graph: loaded %d cached entries from %s",
102
        len(previous),
103
        cache_path,
104
    )
105

106
    # Step 2: Classify targets as cached or changed
NEW
107
    changed_targets = []
×
NEW
108
    cached_results: list[tuple[Address, CachedEntry]] = []
×
109

NEW
110
    for tgt in all_targets:
×
NEW
111
        spec = tgt.address.spec
×
NEW
112
        fingerprint = compute_source_fingerprint(tgt.address, buildroot)
×
113

NEW
114
        cached_entry = previous.get(spec)
×
NEW
115
        if cached_entry is not None and cached_entry.fingerprint == fingerprint:
×
NEW
116
            cached_results.append((tgt.address, cached_entry))
×
117
        else:
NEW
118
            changed_targets.append(tgt)
×
119

NEW
120
    cache_hits = len(cached_results)
×
NEW
121
    cache_misses = len(changed_targets)
×
NEW
122
    logger.warning(
×
123
        "Incremental dep graph: %d cached, %d changed (out of %d total targets)",
124
        cache_hits,
125
        cache_misses,
126
        len(all_targets),
127
    )
128

129
    # Step 3: Resolve deps only for changed targets
NEW
130
    if changed_targets:
×
NEW
131
        fresh_deps_per_target = await concurrently(
×
132
            resolve_dependencies(
133
                DependenciesRequest(
134
                    tgt.get(Dependencies),
135
                    should_traverse_deps_predicate=AlwaysTraverseDeps(),
136
                ),
137
                **implicitly(),
138
            )
139
            for tgt in changed_targets
140
        )
141
    else:
NEW
142
        fresh_deps_per_target = []
×
143

144
    # Step 4: Build the reverse dependency map from merged results
NEW
145
    address_to_dependents: dict[Address, set[Address]] = defaultdict(set)
×
146

147
    # Build a spec → Address lookup from all_targets for resolving cached specs
NEW
148
    spec_to_address: dict[str, Address] = {tgt.address.spec: tgt.address for tgt in all_targets}
×
149

150
    # Process cached results (deps stored as address spec strings)
NEW
151
    for addr, entry in cached_results:
×
NEW
152
        for dep_spec in entry.deps:
×
NEW
153
            dep_addr = spec_to_address.get(dep_spec)
×
NEW
154
            if dep_addr is not None:
×
NEW
155
                address_to_dependents[dep_addr].add(addr)
×
156

157
    # Process freshly resolved results
NEW
158
    for tgt, deps in zip(changed_targets, fresh_deps_per_target):
×
NEW
159
        for dep_addr in deps:
×
NEW
160
            address_to_dependents[dep_addr].add(tgt.address)
×
161

162
    # Step 5: Save the updated forward graph for next run
NEW
163
    new_entries: dict[str, CachedEntry] = {}
×
164

165
    # Carry forward cached entries
NEW
166
    for addr, entry in cached_results:
×
NEW
167
        new_entries[addr.spec] = entry
×
168

169
    # Add fresh entries
NEW
170
    for tgt, deps in zip(changed_targets, fresh_deps_per_target):
×
NEW
171
        spec = tgt.address.spec
×
NEW
172
        fingerprint = compute_source_fingerprint(tgt.address, buildroot)
×
NEW
173
        new_entries[spec] = CachedEntry(
×
174
            fingerprint=fingerprint,
175
            deps=tuple(dep.spec for dep in deps),
176
        )
177

NEW
178
    save_persisted_graph(cache_path, buildroot, new_entries)
×
179

NEW
180
    elapsed = time.time() - start_time
×
NEW
181
    logger.warning(
×
182
        "Incremental dep graph: completed in %.1fs (%d from cache, %d resolved fresh)",
183
        elapsed,
184
        cache_hits,
185
        cache_misses,
186
    )
187

UNCOV
188
    return AddressToDependents(
×
189
        FrozenDict(
190
            {
191
                addr: FrozenOrderedSet(dependents)
192
                for addr, dependents in address_to_dependents.items()
193
            }
194
        )
195
    )
196

197

198
@dataclass(frozen=True)
10✔
199
class DependentsRequest:
10✔
200
    addresses: FrozenOrderedSet[Address]
201
    transitive: bool
202
    include_roots: bool
203

204
    def __init__(
10✔
205
        self, addresses: Iterable[Address], *, transitive: bool, include_roots: bool
206
    ) -> None:
207
        object.__setattr__(self, "addresses", FrozenOrderedSet(addresses))
2✔
208
        object.__setattr__(self, "transitive", transitive)
2✔
209
        object.__setattr__(self, "include_roots", include_roots)
2✔
210

211

212
class Dependents(DeduplicatedCollection[Address]):
10✔
213
    sort_input = True
10✔
214

215

216
@rule(level=LogLevel.DEBUG)
10✔
217
async def find_dependents(
10✔
218
    request: DependentsRequest, address_to_dependents: AddressToDependents
219
) -> Dependents:
220
    check = set(request.addresses)
2✔
221
    known_dependents: set[Address] = set()
2✔
222
    while True:
2✔
223
        dependents = set(known_dependents)
2✔
224
        for target in check:
2✔
225
            target_dependents = address_to_dependents.mapping.get(target, FrozenOrderedSet())
2✔
226
            dependents.update(target_dependents)
2✔
227
        check = dependents - known_dependents
2✔
228
        if not check or not request.transitive:
2✔
229
            result = (
2✔
230
                dependents | set(request.addresses)
231
                if request.include_roots
232
                else dependents - set(request.addresses)
233
            )
234
            return Dependents(result)
2✔
235
        known_dependents = dependents
2✔
236

237

238
class DependentsSubsystem(LineOriented, GoalSubsystem):
10✔
239
    name = "dependents"
10✔
240
    help = "List all targets that depend on any of the input files/targets."
10✔
241

242
    transitive = BoolOption(
10✔
243
        default=False,
244
        help="List all transitive dependents. If unspecified, list direct dependents only.",
245
    )
246
    closed = BoolOption(
10✔
247
        default=False,
248
        help="Include the input targets in the output, along with the dependents.",
249
    )
250
    format = EnumOption(
10✔
251
        default=DependentsOutputFormat.text,
252
        help="Output format for listing dependents.",
253
    )
254

255

256
class DependentsGoal(Goal):
10✔
257
    subsystem_cls = DependentsSubsystem
10✔
258
    environment_behavior = Goal.EnvironmentBehavior.LOCAL_ONLY
10✔
259

260

261
async def list_dependents_as_plain_text(
10✔
262
    addresses: Addresses, dependents_subsystem: DependentsSubsystem, console: Console
263
) -> None:
264
    """Get dependents for given addresses and list them in the console as a single list."""
265
    dependents = await find_dependents(
2✔
266
        DependentsRequest(
267
            addresses,
268
            transitive=dependents_subsystem.transitive,
269
            include_roots=dependents_subsystem.closed,
270
        ),
271
        **implicitly(),
272
    )
273
    with dependents_subsystem.line_oriented(console) as print_stdout:
2✔
274
        for address in dependents:
2✔
275
            print_stdout(address.spec)
2✔
276

277

278
async def list_dependents_as_json(
10✔
279
    addresses: Addresses, dependents_subsystem: DependentsSubsystem, console: Console
280
) -> None:
281
    """Get dependents for given addresses and list them in the console in JSON."""
282
    dependents_group = await concurrently(
1✔
283
        find_dependents(
284
            DependentsRequest(
285
                (address,),
286
                transitive=dependents_subsystem.transitive,
287
                include_roots=dependents_subsystem.closed,
288
            ),
289
            **implicitly(),
290
        )
291
        for address in addresses
292
    )
293
    iterated_addresses = []
1✔
294
    for dependents in dependents_group:
1✔
295
        iterated_addresses.append(sorted([str(address) for address in dependents]))
1✔
296
    mapping = dict(zip([str(address) for address in addresses], iterated_addresses))
1✔
297
    output = json.dumps(mapping, indent=4)
1✔
298
    with dependents_subsystem.line_oriented(console) as print_stdout:
1✔
299
        print_stdout(output)
1✔
300

301

302
@goal_rule
10✔
303
async def dependents_goal(
10✔
304
    specified_addresses: Addresses, dependents_subsystem: DependentsSubsystem, console: Console
305
) -> DependentsGoal:
306
    if DependentsOutputFormat.text == dependents_subsystem.format:
2✔
307
        await list_dependents_as_plain_text(
2✔
308
            addresses=specified_addresses,
309
            dependents_subsystem=dependents_subsystem,
310
            console=console,
311
        )
312
    elif DependentsOutputFormat.json == dependents_subsystem.format:
1✔
313
        await list_dependents_as_json(
1✔
314
            addresses=specified_addresses,
315
            dependents_subsystem=dependents_subsystem,
316
            console=console,
317
        )
318
    return DependentsGoal(exit_code=0)
2✔
319

320

321
def rules():
10✔
322
    return collect_rules()
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc