• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18517631058

15 Oct 2025 04:18AM UTC coverage: 69.207% (-11.1%) from 80.267%
18517631058

Pull #22745

github

web-flow
Merge 642a76ca1 into 99919310e
Pull Request #22745: [windows] Add windows support in the stdio crate.

53815 of 77759 relevant lines covered (69.21%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.1
/src/python/pants/backend/python/goals/tailor.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import logging
2✔
7
import os
2✔
8
import re
2✔
9
from collections.abc import Iterable
2✔
10
from dataclasses import dataclass
2✔
11
from pathlib import PurePath
2✔
12

13
from pants.backend.python.dependency_inference.module_mapper import module_from_stripped_path
2✔
14
from pants.backend.python.macros.pipenv_requirements import parse_pipenv_requirements
2✔
15
from pants.backend.python.macros.poetry_requirements import PyProjectToml, parse_pyproject_toml
2✔
16
from pants.backend.python.macros.python_requirements import (
2✔
17
    parse_pyproject_toml as parse_pep621_pyproject_toml,
18
)
19
from pants.backend.python.subsystems.setup import PythonSetup
2✔
20
from pants.backend.python.target_types import (
2✔
21
    PexBinary,
22
    PexEntryPointField,
23
    PythonSourcesGeneratorTarget,
24
    PythonTestsGeneratingSourcesField,
25
    PythonTestsGeneratorTarget,
26
    PythonTestUtilsGeneratingSourcesField,
27
    PythonTestUtilsGeneratorTarget,
28
    ResolvePexEntryPointRequest,
29
)
30
from pants.backend.python.target_types_rules import resolve_pex_entry_point
2✔
31
from pants.base.specs import AncestorGlobSpec, RawSpecs
2✔
32
from pants.core.goals.tailor import (
2✔
33
    AllOwnedSources,
34
    PutativeTarget,
35
    PutativeTargets,
36
    PutativeTargetsRequest,
37
)
38
from pants.core.target_types import ResourceTarget
2✔
39
from pants.engine.fs import FileContent, PathGlobs
2✔
40
from pants.engine.internals.graph import resolve_unexpanded_targets
2✔
41
from pants.engine.internals.selectors import concurrently
2✔
42
from pants.engine.intrinsics import get_digest_contents, path_globs_to_paths
2✔
43
from pants.engine.rules import collect_rules, implicitly, rule
2✔
44
from pants.engine.target import Target
2✔
45
from pants.engine.unions import UnionRule
2✔
46
from pants.source.filespec import FilespecMatcher
2✔
47
from pants.source.source_root import SourceRootsRequest, get_source_roots
2✔
48
from pants.util.dirutil import group_by_dir
2✔
49
from pants.util.logging import LogLevel
2✔
50
from pants.util.requirements import parse_requirements_file
2✔
51

52
logger = logging.getLogger(__name__)
2✔
53

54

55
@dataclass(frozen=True)
2✔
56
class PutativePythonTargetsRequest(PutativeTargetsRequest):
2✔
57
    pass
2✔
58

59

60
def classify_source_files(paths: Iterable[str]) -> dict[type[Target], set[str]]:
2✔
61
    """Returns a dict of target type -> files that belong to targets of that type."""
62
    tests_filespec_matcher = FilespecMatcher(PythonTestsGeneratingSourcesField.default, ())
×
63
    test_utils_filespec_matcher = FilespecMatcher(PythonTestUtilsGeneratingSourcesField.default, ())
×
64

65
    path_to_file_name = {path: os.path.basename(path) for path in paths}
×
66
    test_file_names = set(tests_filespec_matcher.matches(list(path_to_file_name.values())))
×
67
    test_util_file_names = set(
×
68
        test_utils_filespec_matcher.matches(list(path_to_file_name.values()))
69
    )
70

71
    test_files = {
×
72
        path for path, file_name in path_to_file_name.items() if file_name in test_file_names
73
    }
74
    test_util_files = {
×
75
        path for path, file_name in path_to_file_name.items() if file_name in test_util_file_names
76
    }
77
    library_files = set(paths) - test_files - test_util_files
×
78
    return {
×
79
        PythonTestsGeneratorTarget: test_files,
80
        PythonTestUtilsGeneratorTarget: test_util_files,
81
        PythonSourcesGeneratorTarget: library_files,
82
    }
83

84

85
# The order "__main__" == __name__ would also technically work, but is very
86
# non-idiomatic, so we ignore it.
87
_entry_point_re = re.compile(rb"^if __name__ +== +['\"]__main__['\"]: *(#.*)?$", re.MULTILINE)
2✔
88

89

90
def is_entry_point(content: bytes) -> bool:
2✔
91
    # Identify files that look like entry points.  We use a regex for speed, as it will catch
92
    # almost all correct cases in practice, with extremely rare false positives (we will only
93
    # have a false positive if the matching code is in a multiline string indented all the way
94
    # to the left). Looking at the ast would be more correct, technically, but also more laborious,
95
    # trickier to implement correctly for different interpreter versions, and much slower.
96
    return _entry_point_re.search(content) is not None
×
97

98

99
async def _find_resource_py_typed_targets(
2✔
100
    py_typed_files_globs: PathGlobs, all_owned_sources: AllOwnedSources
101
) -> list[PutativeTarget]:
102
    """Find resource targets that may be created after discovering any `py.typed` files."""
103
    all_py_typed_files = await path_globs_to_paths(py_typed_files_globs)
×
104
    unowned_py_typed_files = set(all_py_typed_files.files) - set(all_owned_sources)
×
105

106
    putative_targets = []
×
107
    for dirname, filenames in group_by_dir(unowned_py_typed_files).items():
×
108
        putative_targets.append(
×
109
            PutativeTarget.for_target_type(
110
                ResourceTarget,
111
                kwargs={"source": "py.typed"},
112
                path=dirname,
113
                name="py_typed",
114
                triggering_sources=sorted(filenames),
115
            )
116
        )
117
    return putative_targets
×
118

119

120
async def _find_source_targets(
2✔
121
    py_files_globs: PathGlobs, all_owned_sources: AllOwnedSources, python_setup: PythonSetup
122
) -> list[PutativeTarget]:
123
    result = []
×
124
    check_if_init_file_empty: dict[str, tuple[str, str]] = {}  # full_path: (dirname, filename)
×
125

126
    all_py_files = await path_globs_to_paths(py_files_globs)
×
127
    unowned_py_files = set(all_py_files.files) - set(all_owned_sources)
×
128
    classified_unowned_py_files = classify_source_files(unowned_py_files)
×
129
    for tgt_type, paths in classified_unowned_py_files.items():
×
130
        for dirname, filenames in group_by_dir(paths).items():
×
131
            name: str | None
132
            if issubclass(tgt_type, PythonTestsGeneratorTarget):
×
133
                name = "tests"
×
134
            elif issubclass(tgt_type, PythonTestUtilsGeneratorTarget):
×
135
                name = "test_utils"
×
136
            else:
137
                name = None
×
138
            if (
×
139
                python_setup.tailor_ignore_empty_init_files
140
                and tgt_type == PythonSourcesGeneratorTarget
141
                and filenames in ({"__init__.py"}, {"__init__.pyi"})
142
            ):
143
                f = next(iter(filenames))
×
144
                check_if_init_file_empty[os.path.join(dirname, f)] = (dirname, f)
×
145
            else:
146
                result.append(
×
147
                    PutativeTarget.for_target_type(
148
                        tgt_type, path=dirname, name=name, triggering_sources=sorted(filenames)
149
                    )
150
                )
151

152
    if check_if_init_file_empty:
×
153
        init_contents = await get_digest_contents(
×
154
            **implicitly(PathGlobs(check_if_init_file_empty.keys()))
155
        )
156
        for file_content in init_contents:
×
157
            if not file_content.content.strip():
×
158
                continue
×
159
            d, f = check_if_init_file_empty[file_content.path]
×
160
            result.append(
×
161
                PutativeTarget.for_target_type(
162
                    PythonSourcesGeneratorTarget, path=d, name=None, triggering_sources=[f]
163
                )
164
            )
165

166
    return result
×
167

168

169
@rule(level=LogLevel.DEBUG, desc="Determine candidate Python targets to create")
2✔
170
async def find_putative_targets(
2✔
171
    req: PutativePythonTargetsRequest,
172
    all_owned_sources: AllOwnedSources,
173
    python_setup: PythonSetup,
174
) -> PutativeTargets:
175
    pts = []
×
176
    all_py_files_globs: PathGlobs = req.path_globs("*.py", "*.pyi")
×
177

178
    if python_setup.tailor_source_targets:
×
179
        source_targets = await _find_source_targets(
×
180
            all_py_files_globs, all_owned_sources, python_setup
181
        )
182
        pts.extend(source_targets)
×
183

184
    if python_setup.tailor_py_typed_targets:
×
185
        all_py_typed_files_globs: PathGlobs = req.path_globs("py.typed")
×
186
        resource_targets = await _find_resource_py_typed_targets(
×
187
            all_py_typed_files_globs, all_owned_sources
188
        )
189
        pts.extend(resource_targets)
×
190

191
    if python_setup.tailor_requirements_targets:
×
192
        # Find requirements files.
193
        (
×
194
            all_requirements_files,
195
            all_pipenv_lockfile_files,
196
            all_pyproject_toml_contents,
197
        ) = await concurrently(
198
            get_digest_contents(**implicitly(req.path_globs("*requirements*.txt"))),
199
            get_digest_contents(**implicitly(req.path_globs("Pipfile.lock"))),
200
            get_digest_contents(**implicitly(req.path_globs("pyproject.toml"))),
201
        )
202

203
        def add_req_targets(files: Iterable[FileContent], alias: str, target_name: str) -> None:
×
204
            contents = {i.path: i.content for i in files}
×
205
            unowned_files = set(contents) - set(all_owned_sources)
×
206
            for fp in unowned_files:
×
207
                path, name = os.path.split(fp)
×
208

209
                try:
×
210
                    validate(fp, contents[fp], alias, target_name)
×
211
                except Exception as e:
×
212
                    logger.warning(
×
213
                        f"An error occurred when validating `{fp}`: {e}.\n\n"
214
                        "You'll need to create targets for its contents manually.\n"
215
                        "To silence this error in future, see "
216
                        "https://www.pantsbuild.org/docs/reference-tailor#section-ignore-paths \n"
217
                    )
218
                    continue
×
219

220
                pts.append(
×
221
                    PutativeTarget(
222
                        path=path,
223
                        name=target_name,
224
                        type_alias=alias,
225
                        triggering_sources=[fp],
226
                        owned_sources=[name],
227
                        kwargs=(
228
                            {}
229
                            if alias != "python_requirements" or name == "requirements.txt"
230
                            else {"source": name}
231
                        ),
232
                    )
233
                )
234

235
        def validate(path: str, contents: bytes, alias: str, target_name: str) -> None:
×
236
            if alias == "python_requirements":
×
237
                if path.endswith("pyproject.toml"):
×
238
                    return validate_pep621_requirements(path, contents)
×
239
                return validate_python_requirements(path, contents)
×
240
            elif alias == "pipenv_requirements":
×
241
                return validate_pipenv_requirements(contents)
×
242
            elif alias == "poetry_requirements":
×
243
                return validate_poetry_requirements(contents)
×
244

245
        def validate_python_requirements(path: str, contents: bytes) -> None:
×
246
            for _ in parse_requirements_file(contents.decode(), rel_path=path):
×
247
                pass
×
248

249
        def validate_pep621_requirements(path: str, contents: bytes) -> None:
×
250
            list(parse_pep621_pyproject_toml(contents.decode(), rel_path=path))
×
251

252
        def validate_pipenv_requirements(contents: bytes) -> None:
×
253
            parse_pipenv_requirements(contents)
×
254

255
        def validate_poetry_requirements(contents: bytes) -> None:
×
256
            p = PyProjectToml(PurePath(), PurePath(), contents.decode())
×
257
            parse_pyproject_toml(p)
×
258

259
        add_req_targets(all_requirements_files, "python_requirements", "reqs")
×
260
        add_req_targets(all_pipenv_lockfile_files, "pipenv_requirements", "pipenv")
×
261
        add_req_targets(
×
262
            {fc for fc in all_pyproject_toml_contents if b"[tool.poetry" in fc.content},
263
            "poetry_requirements",
264
            "poetry",
265
        )
266

267
        def pyproject_toml_has_pep621(fc) -> bool:
×
268
            try:
×
269
                return (
×
270
                    len(list(parse_pep621_pyproject_toml(fc.content.decode(), rel_path=fc.path)))
271
                    > 0
272
                )
273
            except Exception:
×
274
                return False
×
275

276
        add_req_targets(
×
277
            {fc for fc in all_pyproject_toml_contents if pyproject_toml_has_pep621(fc)},
278
            "python_requirements",
279
            "reqs",
280
        )
281

282
    if python_setup.tailor_pex_binary_targets:
×
283
        # Find binary targets.
284

285
        # Get all files whose content indicates that they are entry points or are __main__.py files.
286
        digest_contents = await get_digest_contents(**implicitly({all_py_files_globs: PathGlobs}))
×
287
        all_main_py = await path_globs_to_paths(req.path_globs("__main__.py"))
×
288
        entry_points = [
×
289
            file_content.path
290
            for file_content in digest_contents
291
            if is_entry_point(file_content.content)
292
        ] + list(all_main_py.files)
293

294
        # Get the modules for these entry points.
295
        src_roots = await get_source_roots(SourceRootsRequest.for_files(entry_points))
×
296
        module_to_entry_point = {}
×
297
        for entry_point in entry_points:
×
298
            entry_point_path = PurePath(entry_point)
×
299
            src_root = src_roots.path_to_root[entry_point_path]
×
300
            stripped_entry_point = entry_point_path.relative_to(src_root.path)
×
301
            module = module_from_stripped_path(stripped_entry_point)
×
302
            module_to_entry_point[module] = entry_point
×
303

304
        # Get existing binary targets for these entry points.
305
        entry_point_dirs = {os.path.dirname(entry_point) for entry_point in entry_points}
×
306
        possible_existing_binary_targets = await resolve_unexpanded_targets(
×
307
            **implicitly(
308
                RawSpecs(
309
                    ancestor_globs=tuple(AncestorGlobSpec(d) for d in entry_point_dirs),
310
                    description_of_origin="the `pex_binary` tailor rule",
311
                )
312
            )
313
        )
314

315
        possible_existing_binary_entry_points = await concurrently(
×
316
            resolve_pex_entry_point(ResolvePexEntryPointRequest(t[PexEntryPointField]))
317
            for t in possible_existing_binary_targets
318
            if t.has_field(PexEntryPointField)
319
        )
320
        possible_existing_entry_point_modules = {
×
321
            rep.val.module for rep in possible_existing_binary_entry_points if rep.val
322
        }
323
        unowned_entry_point_modules = (
×
324
            module_to_entry_point.keys() - possible_existing_entry_point_modules
325
        )
326

327
        # Generate new targets for entry points that don't already have one.
328
        for entry_point_module in unowned_entry_point_modules:
×
329
            entry_point = module_to_entry_point[entry_point_module]
×
330
            path, fname = os.path.split(entry_point)
×
331
            name = os.path.splitext(fname)[0]
×
332
            pts.append(
×
333
                PutativeTarget.for_target_type(
334
                    target_type=PexBinary,
335
                    path=path,
336
                    name=name,
337
                    triggering_sources=tuple(),
338
                    kwargs={"entry_point": fname},
339
                )
340
            )
341

342
    return PutativeTargets(pts)
×
343

344

345
def rules():
2✔
346
    return [
2✔
347
        *collect_rules(),
348
        UnionRule(PutativeTargetsRequest, PutativePythonTargetsRequest),
349
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc