• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20328535594

18 Dec 2025 06:46AM UTC coverage: 57.969% (-22.3%) from 80.295%
20328535594

Pull #22954

github

web-flow
Merge ccc9c5409 into 407284c67
Pull Request #22954: free up disk space in runner image

39083 of 67421 relevant lines covered (57.97%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/testprojects/pants-plugins/src/python/python_constant/target_types.py
1
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
import ast
×
4
import logging
×
5
from collections import defaultdict
×
6
from dataclasses import dataclass
×
7
from typing import Any, DefaultDict, cast
×
8

9
from pants.backend.python.dependency_inference.module_mapper import (
×
10
    FirstPartyPythonModuleMapping,
11
    ResolveName,
12
)
13
from pants.backend.python.subsystems.setup import PythonSetup
×
14
from pants.backend.python.target_types import PythonResolveField, PythonSourceField
×
15
from pants.engine.addresses import Addresses
×
16
from pants.engine.collection import Collection
×
17
from pants.engine.internals.graph import hydrate_sources, resolve_source_paths
×
18
from pants.engine.internals.target_adaptor import SourceBlock, SourceBlocks
×
19
from pants.engine.intrinsics import get_digest_contents
×
20
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
×
21
from pants.engine.target import (
×
22
    COMMON_TARGET_FIELDS,
23
    AllTargets,
24
    Dependencies,
25
    FieldSet,
26
    GeneratedTargets,
27
    GenerateTargetsRequest,
28
    HydrateSourcesRequest,
29
    InferDependenciesRequest,
30
    InferredDependencies,
31
    IntField,
32
    SingleSourceField,
33
    SourcesPathsRequest,
34
    StringField,
35
    Target,
36
    TargetGenerator,
37
)
38
from pants.engine.unions import UnionRule
×
39
from pants.util.frozendict import FrozenDict
×
40
from pants.util.ordered_set import FrozenOrderedSet
×
41

42
logger = logging.getLogger(__name__)
×
43

44

45
class PythonConstantSourceField(SingleSourceField):
×
46
    required = True
×
47

48

49
class PythonConstantDependencies(Dependencies):
×
50
    pass
×
51

52

53
class PythonConstantLinenoField(IntField):
×
54
    alias = "lineno"
×
55

56

57
class PythonConstantEndLinenoField(IntField):
×
58
    alias = "end_lineno"
×
59

60

61
class PythonConstantNameField(StringField):
×
62
    alias = "constant"
×
63

64

65
class PythonConstantTarget(Target):
×
66
    alias = "python_constant"
×
67
    core_fields = (
×
68
        PythonConstantSourceField,
69
        PythonConstantNameField,
70
        PythonConstantLinenoField,
71
        PythonConstantEndLinenoField,
72
        PythonConstantDependencies,
73
    )
74

75

76
class PythonConstantTargetGenerator(TargetGenerator):
×
77
    alias = "python_constants"
×
78
    generated_target_cls = PythonConstantTarget
×
79
    core_fields = (
×
80
        *COMMON_TARGET_FIELDS,
81
        PythonConstantSourceField,
82
        PythonConstantDependencies,
83
    )
84
    copied_fields = (
×
85
        *COMMON_TARGET_FIELDS,
86
        PythonConstantSourceField,
87
    )
88
    moved_fields = (PythonConstantDependencies,)
×
89

90

91
class GeneratePythonConstantTargetsRequest(GenerateTargetsRequest):
×
92
    generate_from = PythonConstantTargetGenerator
×
93

94

95
@dataclass
×
96
class PythonConstant:
×
97
    name: str
×
98
    lineno: int
×
99
    end_lineno: int
×
100

101
    def to_source_block(self) -> SourceBlock:
×
102
        return SourceBlock(
×
103
            start=self.lineno,
104
            end=self.end_lineno + 1,
105
        )
106

107

108
class PythonConstantVisitor(ast.NodeVisitor):
×
109
    def __init__(self) -> None:
×
110
        super().__init__()
×
111
        self._constants: list[PythonConstant] = []
×
112

113
    def visit_Module(self, node: ast.Module) -> Any:
×
114
        for stmt in node.body:
×
115
            if isinstance(stmt, ast.Assign):
×
116
                for target in stmt.targets:
×
117
                    if isinstance(target, ast.Name):
×
118
                        assert stmt.end_lineno
×
119
                        self._constants.append(
×
120
                            PythonConstant(target.id, stmt.lineno, stmt.end_lineno)
121
                        )
122

123
    @classmethod
×
124
    def parse_constants(cls, content: bytes) -> list[PythonConstant]:
×
125
        parsed = ast.parse(content.decode("utf-8"))
×
126
        v = PythonConstantVisitor()
×
127
        v.visit(parsed)
×
128
        return v._constants
×
129

130

131
@rule
×
132
async def generate_python_constant_targets(
×
133
    request: GeneratePythonConstantTargetsRequest,
134
) -> GeneratedTargets:
135
    hydrated_sources = await hydrate_sources(
×
136
        HydrateSourcesRequest(request.generator[PythonConstantSourceField]), **implicitly()
137
    )
138
    logger.debug("python_constant sources: %s", hydrated_sources)
×
139
    digest_files = await get_digest_contents(hydrated_sources.snapshot.digest)
×
140
    content = digest_files[0].content
×
141
    python_constants = PythonConstantVisitor.parse_constants(content)
×
142
    logger.debug("parsed python_constants: %s", python_constants)
×
143
    return GeneratedTargets(
×
144
        request.generator,
145
        [
146
            PythonConstantTarget(
147
                {
148
                    **request.template,
149
                    PythonConstantNameField.alias: python_constant.name,
150
                },
151
                request.template_address.create_generated(python_constant.name),
152
                origin_sources_blocks=FrozenDict(
153
                    {
154
                        digest_files[0].path: SourceBlocks([python_constant.to_source_block()]),
155
                    }
156
                ),
157
            )
158
            for python_constant in python_constants
159
        ],
160
    )
161

162

163
@dataclass(frozen=True)
×
164
class InferPythonDependenciesOnPythonConstantsFieldSet(FieldSet):
×
165
    required_fields = (PythonSourceField, PythonResolveField)
×
166

167
    source: PythonSourceField
×
168
    resolve: PythonResolveField
×
169

170

171
class InferPythonDependenciesOnPythonConstantsRequest(
×
172
    InferDependenciesRequest[InferPythonDependenciesOnPythonConstantsFieldSet]
173
):
174
    infer_from = InferPythonDependenciesOnPythonConstantsFieldSet
×
175

176

177
@dataclass(frozen=True)
×
178
class Var:
×
179
    module: str
×
180
    name: str
×
181

182

183
class ImportVisitor(ast.NodeVisitor):
×
184
    def __init__(self, search_for_modules: set[str]) -> None:
×
185
        super().__init__()
×
186
        self._search_for = search_for_modules
×
187
        self._found: set[Var] = set()
×
188

189
    def visit_ImportFrom(self, node: ast.ImportFrom) -> Any:
×
190
        if node.module not in self._search_for:
×
191
            return
×
192

193
        for alias in node.names:
×
194
            self._found.add(Var(node.module, alias.name))
×
195

196
    @classmethod
×
197
    def search_for_vars(cls, content: bytes, modules: set[str]) -> set[Var]:
×
198
        parsed = ast.parse(content.decode("utf-8"))
×
199
        v = cls(modules)
×
200
        v.visit(parsed)
×
201
        return v._found
×
202

203

204
class AllPythonConstantTargets(Collection[PythonConstantTarget]):
×
205
    pass
×
206

207

208
@rule
×
209
async def get_python_constant_targets(targets: AllTargets) -> AllPythonConstantTargets:
×
210
    return AllPythonConstantTargets(
×
211
        cast(PythonConstantTarget, target)
212
        for target in targets
213
        if target.has_field(PythonConstantSourceField)
214
    )
215

216

217
class BackwardMapping(FrozenDict[ResolveName, FrozenDict[str, tuple[str, ...]]]):
×
218
    pass
×
219

220

221
@dataclass
×
222
class BackwardMappingRequest:
×
223
    addresses: Addresses
×
224

225

226
@rule
×
227
async def get_backward_mapping(
×
228
    python_constant_targets: AllPythonConstantTargets,
229
    mapping: FirstPartyPythonModuleMapping,
230
) -> BackwardMapping:
231
    paths = await concurrently(
×
232
        resolve_source_paths(
233
            SourcesPathsRequest(tgt.get(PythonConstantSourceField)), **implicitly()
234
        )
235
        for tgt in python_constant_targets
236
    )
237
    search_for = {file for path in paths for file in path.files}
×
238

239
    result: DefaultDict[str, DefaultDict[str, list[str]]] = defaultdict(lambda: defaultdict(list))
×
240
    for resolve, m in mapping.resolves_to_modules_to_providers.items():
×
241
        for module, module_providers in m.items():
×
242
            for module_provider in module_providers:
×
243
                filename = module_provider.addr.filename
×
244
                if filename in search_for:
×
245
                    result[resolve][filename].append(module)
×
246

247
    return BackwardMapping(
×
248
        FrozenDict(
249
            (
250
                resolve,
251
                FrozenDict((filename, tuple(sorted(modules))) for filename, modules in m.items()),
252
            )
253
            for resolve, m in result.items()
254
        )
255
    )
256

257

258
@rule
×
259
async def infer_python_dependencies_on_python_constants(
×
260
    request: InferPythonDependenciesOnPythonConstantsRequest,
261
    python_setup: PythonSetup,
262
    python_constant_targets: AllPythonConstantTargets,
263
    mapping: FirstPartyPythonModuleMapping,
264
    backward_mapping: BackwardMapping,
265
) -> InferredDependencies:
266
    """Infers dependencies on PythonConstantTarget-s based on python source imports."""
267

268
    sources = await hydrate_sources(HydrateSourcesRequest(request.field_set.source), **implicitly())
×
269
    digest_files = await get_digest_contents(sources.snapshot.digest)
×
270
    content = digest_files[0].content
×
271
    resolve = request.field_set.resolve.normalized_value(python_setup)
×
272
    assert resolve is not None, "resolve is None"
×
273

274
    if not backward_mapping:
×
275
        raise ValueError("empty backward mapping")
×
276

277
    paths = await concurrently(
×
278
        resolve_source_paths(
279
            SourcesPathsRequest(tgt.get(PythonConstantSourceField)), **implicitly()
280
        )
281
        for tgt in python_constant_targets
282
    )
283
    logger.debug("backward mapping %s", backward_mapping)
×
284
    interesting_modules = {
×
285
        module
286
        for path in paths
287
        for filename in path.files
288
        for module in backward_mapping[resolve][filename]
289
    }
290

291
    logger.debug("interesting_modules %s", interesting_modules)
×
292
    vars = ImportVisitor.search_for_vars(content, interesting_modules)
×
293
    logger.debug("vars %s", vars)
×
294

295
    filenames_to_python_constant_targets: DefaultDict[str, list[PythonConstantTarget]] = (
×
296
        defaultdict(list)
297
    )
298
    for path, target in zip(paths, python_constant_targets):
×
299
        for filename in path.files:
×
300
            filenames_to_python_constant_targets[filename].append(target)
×
301

302
    include = set()
×
303
    for var in vars:
×
304
        for provider in mapping.resolves_to_modules_to_providers[resolve][var.module]:
×
305
            targets = filenames_to_python_constant_targets[provider.addr.filename]
×
306
            for target in targets:
×
307
                name = target.get(PythonConstantNameField).value
×
308
                logger.debug("check for var %s %s", name, var.name)
×
309
                if name == var.name:
×
310
                    include.add(target.address)
×
311

312
    logger.debug("include %s", include)
×
313
    return InferredDependencies(
×
314
        include=FrozenOrderedSet(include),
315
        exclude=FrozenOrderedSet(),
316
    )
317

318

319
def rules():
×
320
    return (
×
321
        *collect_rules(),
322
        UnionRule(GenerateTargetsRequest, GeneratePythonConstantTargetsRequest),
323
        UnionRule(InferDependenciesRequest, InferPythonDependenciesOnPythonConstantsRequest),
324
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc