• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21932133767

12 Feb 2026 03:13AM UTC coverage: 80.353% (-0.002%) from 80.355%
21932133767

Pull #23098

github

web-flow
Merge 2c71295e1 into f0adfc9c7
Pull Request #23098: Support custom globs in Python tailoring.

16 of 17 new or added lines in 4 files covered. (94.12%)

6 existing lines in 2 files now uncovered.

78784 of 98047 relevant lines covered (80.35%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.97
/src/python/pants/backend/python/goals/tailor.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
4✔
5

6
import logging
4✔
7
import os
4✔
8
import re
4✔
9
from collections.abc import Iterable
4✔
10
from dataclasses import dataclass
4✔
11
from pathlib import PurePath
4✔
12
from typing import Sequence
4✔
13

14
from pants.backend.python.dependency_inference.module_mapper import module_from_stripped_path
4✔
15
from pants.backend.python.macros.pipenv_requirements import parse_pipenv_requirements
4✔
16
from pants.backend.python.macros.poetry_requirements import PyProjectToml, parse_pyproject_toml
4✔
17
from pants.backend.python.macros.python_requirements import (
4✔
18
    parse_pyproject_toml as parse_pep621_pyproject_toml,
19
)
20
from pants.backend.python.subsystems.setup import PythonSetup
4✔
21
from pants.backend.python.target_types import (
4✔
22
    PexBinary,
23
    PexEntryPointField,
24
    PythonSourcesGeneratorTarget,
25
    PythonTestsGeneratorTarget,
26
    PythonTestUtilsGeneratorTarget,
27
    ResolvePexEntryPointRequest,
28
)
29
from pants.backend.python.target_types_rules import resolve_pex_entry_point
4✔
30
from pants.base.specs import AncestorGlobSpec, RawSpecs
4✔
31
from pants.core.goals.tailor import (
4✔
32
    AllOwnedSources,
33
    PutativeTarget,
34
    PutativeTargets,
35
    PutativeTargetsRequest,
36
)
37
from pants.core.target_types import ResourceTarget
4✔
38
from pants.engine.fs import FileContent, PathGlobs
4✔
39
from pants.engine.internals.graph import resolve_unexpanded_targets
4✔
40
from pants.engine.internals.selectors import concurrently
4✔
41
from pants.engine.intrinsics import get_digest_contents, path_globs_to_paths
4✔
42
from pants.engine.rules import collect_rules, implicitly, rule
4✔
43
from pants.engine.target import Target
4✔
44
from pants.engine.unions import UnionRule
4✔
45
from pants.source.filespec import FilespecMatcher
4✔
46
from pants.source.source_root import SourceRootsRequest, get_source_roots
4✔
47
from pants.util.dirutil import group_by_dir
4✔
48
from pants.util.logging import LogLevel
4✔
49
from pants.util.requirements import parse_requirements_file
4✔
50

51
logger = logging.getLogger(__name__)
4✔
52

53

54
@dataclass(frozen=True)
4✔
55
class PutativePythonTargetsRequest(PutativeTargetsRequest):
4✔
56
    pass
4✔
57

58

59
def classify_source_files(
4✔
60
    paths: Iterable[str], test_file_globs: Sequence[str], testutils_file_globs: Sequence[str]
61
) -> dict[type[Target], set[str]]:
62
    """Returns a dict of target type -> files that belong to targets of that type."""
63
    tests_filespec_matcher = FilespecMatcher(test_file_globs, ())
1✔
64
    test_utils_filespec_matcher = FilespecMatcher(testutils_file_globs, ())
1✔
65

66
    path_to_file_name = {path: os.path.basename(path) for path in paths}
1✔
67
    test_file_names = set(tests_filespec_matcher.matches(list(path_to_file_name.values())))
1✔
68
    test_util_file_names = set(
1✔
69
        test_utils_filespec_matcher.matches(list(path_to_file_name.values()))
70
    )
71

72
    test_files = {
1✔
73
        path for path, file_name in path_to_file_name.items() if file_name in test_file_names
74
    }
75
    test_util_files = {
1✔
76
        path for path, file_name in path_to_file_name.items() if file_name in test_util_file_names
77
    }
78
    library_files = set(paths) - test_files - test_util_files
1✔
79
    return {
1✔
80
        PythonTestsGeneratorTarget: test_files,
81
        PythonTestUtilsGeneratorTarget: test_util_files,
82
        PythonSourcesGeneratorTarget: library_files,
83
    }
84

85

86
# The order "__main__" == __name__ would also technically work, but is very
87
# non-idiomatic, so we ignore it.
88
_entry_point_re = re.compile(rb"^if __name__ +== +['\"]__main__['\"]: *(#.*)?$", re.MULTILINE)
4✔
89

90

91
def is_entry_point(content: bytes) -> bool:
4✔
92
    # Identify files that look like entry points.  We use a regex for speed, as it will catch
93
    # almost all correct cases in practice, with extremely rare false positives (we will only
94
    # have a false positive if the matching code is in a multiline string indented all the way
95
    # to the left). Looking at the ast would be more correct, technically, but also more laborious,
96
    # trickier to implement correctly for different interpreter versions, and much slower.
97
    return _entry_point_re.search(content) is not None
1✔
98

99

100
async def _find_resource_py_typed_targets(
4✔
101
    py_typed_files_globs: PathGlobs, all_owned_sources: AllOwnedSources
102
) -> list[PutativeTarget]:
103
    """Find resource targets that may be created after discovering any `py.typed` files."""
104
    all_py_typed_files = await path_globs_to_paths(py_typed_files_globs)
×
105
    unowned_py_typed_files = set(all_py_typed_files.files) - set(all_owned_sources)
×
106

107
    putative_targets = []
×
108
    for dirname, filenames in group_by_dir(unowned_py_typed_files).items():
×
109
        putative_targets.append(
×
110
            PutativeTarget.for_target_type(
111
                ResourceTarget,
112
                kwargs={"source": "py.typed"},
113
                path=dirname,
114
                name="py_typed",
115
                triggering_sources=sorted(filenames),
116
            )
117
        )
118
    return putative_targets
×
119

120

121
async def _find_source_targets(
4✔
122
    py_files_globs: PathGlobs, all_owned_sources: AllOwnedSources, python_setup: PythonSetup
123
) -> list[PutativeTarget]:
124
    result = []
×
125
    check_if_init_file_empty: dict[str, tuple[str, str]] = {}  # full_path: (dirname, filename)
×
UNCOV
126
    all_py_files = await path_globs_to_paths(py_files_globs)
×
127
    unowned_py_files = set(all_py_files.files) - set(all_owned_sources)
×
NEW
128
    classified_unowned_py_files = classify_source_files(
×
129
        unowned_py_files,
130
        python_setup.tailor_test_file_globs,
131
        python_setup.tailor_testutils_file_globs,
132
    )
133
    for tgt_type, paths in classified_unowned_py_files.items():
×
134
        for dirname, filenames in group_by_dir(paths).items():
×
135
            name: str | None
136
            if issubclass(tgt_type, PythonTestsGeneratorTarget):
×
137
                name = "tests"
×
138
            elif issubclass(tgt_type, PythonTestUtilsGeneratorTarget):
×
139
                name = "test_utils"
×
140
            else:
141
                name = None
×
142
            if (
×
143
                python_setup.tailor_ignore_empty_init_files
144
                and tgt_type == PythonSourcesGeneratorTarget
145
                and filenames in ({"__init__.py"}, {"__init__.pyi"})
146
            ):
147
                f = next(iter(filenames))
×
148
                check_if_init_file_empty[os.path.join(dirname, f)] = (dirname, f)
×
149
            else:
150
                result.append(
×
151
                    PutativeTarget.for_target_type(
152
                        tgt_type, path=dirname, name=name, triggering_sources=sorted(filenames)
153
                    )
154
                )
155

156
    if check_if_init_file_empty:
×
157
        init_contents = await get_digest_contents(
×
158
            **implicitly(PathGlobs(check_if_init_file_empty.keys()))
159
        )
160
        for file_content in init_contents:
×
161
            if not file_content.content.strip():
×
162
                continue
×
163
            d, f = check_if_init_file_empty[file_content.path]
×
164
            result.append(
×
165
                PutativeTarget.for_target_type(
166
                    PythonSourcesGeneratorTarget, path=d, name=None, triggering_sources=[f]
167
                )
168
            )
169

170
    return result
×
171

172

173
@rule(level=LogLevel.DEBUG, desc="Determine candidate Python targets to create")
4✔
174
async def find_putative_targets(
4✔
175
    req: PutativePythonTargetsRequest,
176
    all_owned_sources: AllOwnedSources,
177
    python_setup: PythonSetup,
178
) -> PutativeTargets:
179
    pts = []
×
180
    all_py_files_globs: PathGlobs = req.path_globs("*.py", "*.pyi")
×
181

182
    if python_setup.tailor_source_targets:
×
183
        source_targets = await _find_source_targets(
×
184
            all_py_files_globs, all_owned_sources, python_setup
185
        )
186
        pts.extend(source_targets)
×
187

188
    if python_setup.tailor_py_typed_targets:
×
189
        all_py_typed_files_globs: PathGlobs = req.path_globs("py.typed")
×
190
        resource_targets = await _find_resource_py_typed_targets(
×
191
            all_py_typed_files_globs, all_owned_sources
192
        )
193
        pts.extend(resource_targets)
×
194

195
    if python_setup.tailor_requirements_targets:
×
196
        # Find requirements files.
197
        (
×
198
            all_requirements_files,
199
            all_pipenv_lockfile_files,
200
            all_pyproject_toml_contents,
201
        ) = await concurrently(
202
            get_digest_contents(**implicitly(req.path_globs("*requirements*.txt"))),
203
            get_digest_contents(**implicitly(req.path_globs("Pipfile.lock"))),
204
            get_digest_contents(**implicitly(req.path_globs("pyproject.toml"))),
205
        )
206

207
        def add_req_targets(files: Iterable[FileContent], alias: str, target_name: str) -> None:
×
208
            contents = {i.path: i.content for i in files}
×
209
            unowned_files = set(contents) - set(all_owned_sources)
×
210
            for fp in unowned_files:
×
211
                path, name = os.path.split(fp)
×
212

213
                try:
×
214
                    validate(fp, contents[fp], alias, target_name)
×
215
                except Exception as e:
×
216
                    logger.warning(
×
217
                        f"An error occurred when validating `{fp}`: {e}.\n\n"
218
                        "You'll need to create targets for its contents manually.\n"
219
                        "To silence this error in future, see "
220
                        "https://www.pantsbuild.org/docs/reference-tailor#section-ignore-paths \n"
221
                    )
222
                    continue
×
223

224
                pts.append(
×
225
                    PutativeTarget(
226
                        path=path,
227
                        name=target_name,
228
                        type_alias=alias,
229
                        triggering_sources=[fp],
230
                        owned_sources=[name],
231
                        kwargs=(
232
                            {}
233
                            if alias != "python_requirements" or name == "requirements.txt"
234
                            else {"source": name}
235
                        ),
236
                    )
237
                )
238

239
        def validate(path: str, contents: bytes, alias: str, target_name: str) -> None:
×
240
            if alias == "python_requirements":
×
241
                if path.endswith("pyproject.toml"):
×
242
                    return validate_pep621_requirements(path, contents)
×
243
                return validate_python_requirements(path, contents)
×
244
            elif alias == "pipenv_requirements":
×
245
                return validate_pipenv_requirements(contents)
×
246
            elif alias == "poetry_requirements":
×
247
                return validate_poetry_requirements(contents)
×
248

249
        def validate_python_requirements(path: str, contents: bytes) -> None:
×
250
            for _ in parse_requirements_file(contents.decode(), rel_path=path):
×
251
                pass
×
252

253
        def validate_pep621_requirements(path: str, contents: bytes) -> None:
×
254
            list(parse_pep621_pyproject_toml(contents.decode(), rel_path=path))
×
255

256
        def validate_pipenv_requirements(contents: bytes) -> None:
×
257
            parse_pipenv_requirements(contents)
×
258

259
        def validate_poetry_requirements(contents: bytes) -> None:
×
260
            p = PyProjectToml(PurePath(), PurePath(), contents.decode())
×
261
            parse_pyproject_toml(p)
×
262

263
        add_req_targets(all_requirements_files, "python_requirements", "reqs")
×
264
        add_req_targets(all_pipenv_lockfile_files, "pipenv_requirements", "pipenv")
×
265
        add_req_targets(
×
266
            {fc for fc in all_pyproject_toml_contents if b"[tool.poetry" in fc.content},
267
            "poetry_requirements",
268
            "poetry",
269
        )
270

271
        def pyproject_toml_has_pep621(fc) -> bool:
×
272
            try:
×
273
                return (
×
274
                    len(list(parse_pep621_pyproject_toml(fc.content.decode(), rel_path=fc.path)))
275
                    > 0
276
                )
277
            except Exception:
×
278
                return False
×
279

280
        add_req_targets(
×
281
            {fc for fc in all_pyproject_toml_contents if pyproject_toml_has_pep621(fc)},
282
            "python_requirements",
283
            "reqs",
284
        )
285

286
    if python_setup.tailor_pex_binary_targets:
×
287
        # Find binary targets.
288

289
        # Get all files whose content indicates that they are entry points or are __main__.py files.
290
        digest_contents = await get_digest_contents(**implicitly({all_py_files_globs: PathGlobs}))
×
291
        all_main_py = await path_globs_to_paths(req.path_globs("__main__.py"))
×
292
        entry_points = [
×
293
            file_content.path
294
            for file_content in digest_contents
295
            if is_entry_point(file_content.content)
296
        ] + list(all_main_py.files)
297

298
        # Get the modules for these entry points.
299
        src_roots = await get_source_roots(SourceRootsRequest.for_files(entry_points))
×
300
        module_to_entry_point = {}
×
301
        for entry_point in entry_points:
×
302
            entry_point_path = PurePath(entry_point)
×
303
            src_root = src_roots.path_to_root[entry_point_path]
×
304
            stripped_entry_point = entry_point_path.relative_to(src_root.path)
×
305
            module = module_from_stripped_path(stripped_entry_point)
×
306
            module_to_entry_point[module] = entry_point
×
307

308
        # Get existing binary targets for these entry points.
309
        entry_point_dirs = {os.path.dirname(entry_point) for entry_point in entry_points}
×
310
        possible_existing_binary_targets = await resolve_unexpanded_targets(
×
311
            **implicitly(
312
                RawSpecs(
313
                    ancestor_globs=tuple(AncestorGlobSpec(d) for d in entry_point_dirs),
314
                    description_of_origin="the `pex_binary` tailor rule",
315
                )
316
            )
317
        )
318

319
        possible_existing_binary_entry_points = await concurrently(
×
320
            resolve_pex_entry_point(ResolvePexEntryPointRequest(t[PexEntryPointField]))
321
            for t in possible_existing_binary_targets
322
            if t.has_field(PexEntryPointField)
323
        )
324
        possible_existing_entry_point_modules = {
×
325
            rep.val.module for rep in possible_existing_binary_entry_points if rep.val
326
        }
327
        unowned_entry_point_modules = (
×
328
            module_to_entry_point.keys() - possible_existing_entry_point_modules
329
        )
330

331
        # Generate new targets for entry points that don't already have one.
332
        for entry_point_module in unowned_entry_point_modules:
×
333
            entry_point = module_to_entry_point[entry_point_module]
×
334
            path, fname = os.path.split(entry_point)
×
335
            name = os.path.splitext(fname)[0]
×
336
            pts.append(
×
337
                PutativeTarget.for_target_type(
338
                    target_type=PexBinary,
339
                    path=path,
340
                    name=name,
341
                    triggering_sources=tuple(),
342
                    kwargs={"entry_point": fname},
343
                )
344
            )
345

346
    return PutativeTargets(pts)
×
347

348

349
def rules():
4✔
350
    return [
4✔
351
        *collect_rules(),
352
        UnionRule(PutativeTargetsRequest, PutativePythonTargetsRequest),
353
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc