• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25441711719

06 May 2026 02:31PM UTC coverage: 92.915%. Remained the same
25441711719

push

github

web-flow
use sha pin (with comment) format for generated actions (#23312)

Per the GitHub Action best practices we recently enabled at #23249, we
should pin each action to a SHA so that the reference is actually
immutable.

This will -- I hope -- knock out a large chunk of the 421 alerts we
currently get from zizmor. The next followup would then be upgrades and
harmonizing the generated and none-generated pins.

Notice: This idea was suggested by Claude while going over pinact output
and I was surprised to see that post processing the yaml wasn't too
gross.

92206 of 99237 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/src/python/pants/source/source_root.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import itertools
12✔
7
import logging
12✔
8
import os
12✔
9
from collections import defaultdict
12✔
10
from collections.abc import Iterable
12✔
11
from dataclasses import dataclass
12✔
12
from pathlib import PurePath
12✔
13

14
from pants.build_graph.address import Address
12✔
15
from pants.engine.collection import DeduplicatedCollection
12✔
16
from pants.engine.engine_aware import EngineAwareParameter
12✔
17
from pants.engine.fs import PathGlobs
12✔
18
from pants.engine.intrinsics import path_globs_to_paths
12✔
19
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
12✔
20
from pants.engine.target import Target
12✔
21
from pants.option.option_types import StrListOption
12✔
22
from pants.option.subsystem import Subsystem
12✔
23
from pants.util.docutil import doc_url
12✔
24
from pants.util.frozendict import FrozenDict
12✔
25
from pants.util.logging import LogLevel
12✔
26
from pants.util.memo import memoized_method
12✔
27
from pants.util.strutil import softwrap
12✔
28

29
logger = logging.getLogger(__name__)
12✔
30

31

32
@dataclass(frozen=True, order=True)
12✔
33
class SourceRoot:
12✔
34
    # Relative path from the buildroot.  Note that a source root at the buildroot
35
    # is represented as ".".
36
    path: str
12✔
37

38

39
@dataclass(frozen=True)
12✔
40
class OptionalSourceRoot:
12✔
41
    source_root: SourceRoot | None
12✔
42

43

44
class SourceRootError(Exception):
12✔
45
    """An error related to SourceRoot computation."""
46

47
    def __init__(self, msg: str):
12✔
48
        super().__init__(
1✔
49
            f"{msg}See {doc_url('docs/using-pants/key-concepts/source-roots')} for how to define source roots."
50
        )
51

52

53
class InvalidSourceRootPatternError(SourceRootError):
12✔
54
    """Indicates an invalid pattern was provided."""
55

56

57
class InvalidMarkerFileError(SourceRootError):
12✔
58
    """Indicates an invalid marker file was provided."""
59

60

61
class NoSourceRootError(SourceRootError):
12✔
62
    """Indicates we failed to map a source file to a source root."""
63

64
    def __init__(self, path: str | PurePath, extra_msg: str = ""):
12✔
65
        super().__init__(f"No source root found for `{path}`. {extra_msg}")
1✔
66

67

68
# We perform pattern matching against absolute paths, where "/" represents the repo root.
69
_repo_root = PurePath(os.path.sep)
12✔
70

71

72
@dataclass(frozen=True)
12✔
73
class SourceRootPatternMatcher:
12✔
74
    root_patterns: tuple[str, ...]
12✔
75

76
    def __post_init__(self) -> None:
12✔
77
        for root_pattern in self.root_patterns:
12✔
78
            if ".." in root_pattern.split(os.path.sep):
12✔
79
                raise InvalidSourceRootPatternError(
×
80
                    f"`..` disallowed in source root pattern: {root_pattern}."
81
                )
82

83
    def get_patterns(self) -> tuple[str, ...]:
12✔
84
        return tuple(self.root_patterns)
2✔
85

86
    def matches_root_patterns(self, relpath: PurePath) -> bool:
12✔
87
        """Does this putative root match a pattern?"""
88
        # Note: This is currently O(n) where n is the number of patterns, which
89
        # we expect to be small.  We can optimize if it becomes necessary.
90
        putative_root = _repo_root / relpath
12✔
91
        for pattern in self.root_patterns:
12✔
92
            if putative_root.match(pattern):
12✔
93
                return True
12✔
94
        return False
12✔
95

96

97
class SourceRootConfig(Subsystem):
12✔
98
    options_scope = "source"
12✔
99
    help = "Configuration for roots of source trees."
12✔
100

101
    DEFAULT_ROOT_PATTERNS = [
12✔
102
        "/",
103
        "src",
104
        "src/python",
105
        "src/py",
106
        "src/thrift",
107
        "src/protobuf",
108
        "src/protos",
109
        "src/scala",
110
        "src/java",
111
    ]
112

113
    root_patterns = StrListOption(
12✔
114
        default=DEFAULT_ROOT_PATTERNS,
115
        help=softwrap(
116
            f"""
117
            A list of source root suffixes.
118

119
            A directory with this suffix will be considered a potential source root.
120
            E.g., `src/python` will match `<buildroot>/src/python`, `<buildroot>/project1/src/python`
121
            etc.
122

123
            Prepend a `/` to anchor the match at the buildroot.
124
            E.g., `/src/python` will match `<buildroot>/src/python` but not `<buildroot>/project1/src/python`.
125

126
            A `*` wildcard will match a single path segment,
127
            E.g., `src/*` will match `<buildroot>/src/python` and `<buildroot>/src/rust`.
128

129
            Use `/` to signify that the buildroot itself is a source root.
130

131
            See {doc_url("docs/using-pants/key-concepts/source-roots")}.
132
            """
133
        ),
134
        advanced=True,
135
        metavar='["pattern1", "pattern2", ...]',
136
    )
137
    marker_filenames = StrListOption(
12✔
138
        help=softwrap(
139
            """
140
            The presence of a file of this name in a directory indicates that the directory
141
            is a source root. The content of the file doesn't matter, and may be empty.
142
            Useful when you can't or don't wish to centrally enumerate source roots via
143
            `root_patterns`.
144
            """
145
        ),
146
        advanced=True,
147
        metavar="filename",
148
    )
149

150
    @memoized_method
12✔
151
    def get_pattern_matcher(self) -> SourceRootPatternMatcher:
12✔
152
        return SourceRootPatternMatcher(self.root_patterns)
12✔
153

154

155
@dataclass(frozen=True)
12✔
156
class SourceRootsRequest:
12✔
157
    """Find the source roots for the given files and/or dirs."""
158

159
    files: tuple[PurePath, ...]
12✔
160
    dirs: tuple[PurePath, ...]
12✔
161

162
    def __init__(self, files: Iterable[PurePath], dirs: Iterable[PurePath]) -> None:
12✔
163
        object.__setattr__(self, "files", tuple(sorted(files)))
12✔
164
        object.__setattr__(self, "dirs", tuple(sorted(dirs)))
12✔
165

166
        self.__post_init__()
12✔
167

168
    def __post_init__(self) -> None:
12✔
169
        for path in itertools.chain(self.files, self.dirs):
12✔
170
            if ".." in str(path).split(os.path.sep):
12✔
171
                raise ValueError(f"SourceRootRequest cannot contain `..` segment: {path}")
×
172
            if path.is_absolute():
12✔
173
                raise ValueError(f"SourceRootRequest path must be relative: {path}")
×
174

175
    @classmethod
12✔
176
    def for_files(cls, file_paths: Iterable[str]) -> SourceRootsRequest:
12✔
177
        """Create a request for the source root for the given files."""
178
        return cls({PurePath(file_path) for file_path in file_paths}, ())
12✔
179

180

181
@dataclass(frozen=True)
12✔
182
class SourceRootRequest(EngineAwareParameter):
12✔
183
    """Find the source root for the given path.
184

185
    If you have multiple paths, particularly if many of them share parent directories, you'll get
186
    better performance with a `SourceRootsRequest` (see above) instead.
187
    """
188

189
    path: PurePath
12✔
190

191
    def __post_init__(self) -> None:
12✔
192
        if ".." in str(self.path).split(os.path.sep):
12✔
193
            raise ValueError(f"SourceRootRequest cannot contain `..` segment: {self.path}")
1✔
194
        if self.path.is_absolute():
12✔
195
            raise ValueError(f"SourceRootRequest path must be relative: {self.path}")
×
196

197
    @classmethod
12✔
198
    def for_file(cls, file_path: str) -> SourceRootRequest:
12✔
199
        """Create a request for the source root for the given file."""
200
        # The file itself cannot be a source root, so we may as well start the search
201
        # from its enclosing directory, and save on some superfluous checking.
202
        return cls(PurePath(file_path).parent)
12✔
203

204
    @classmethod
12✔
205
    def for_address(cls, address: Address) -> SourceRootRequest:
12✔
206
        # Note that we don't use for_file() here because the spec_path is a directory.
207
        return cls(PurePath(address.spec_path))
12✔
208

209
    @classmethod
12✔
210
    def for_target(cls, target: Target) -> SourceRootRequest:
12✔
211
        return cls.for_address(target.address)
12✔
212

213
    def debug_hint(self) -> str:
12✔
214
        return str(self.path)
×
215

216

217
@dataclass(frozen=True)
12✔
218
class SourceRootsResult:
12✔
219
    path_to_root: FrozenDict[PurePath, SourceRoot]
12✔
220

221
    def root_to_paths(self) -> FrozenDict[SourceRoot, tuple[PurePath, ...]]:
12✔
222
        res = defaultdict(list)
2✔
223
        for path, root in self.path_to_root.items():
2✔
224
            res[root].append(path)
2✔
225
        return FrozenDict((k, tuple(sorted(v))) for k, v in res.items())
2✔
226

227

228
@dataclass(frozen=True)
12✔
229
class OptionalSourceRootsResult:
12✔
230
    path_to_optional_root: FrozenDict[PurePath, OptionalSourceRoot]
12✔
231

232

233
@rule
12✔
234
async def get_optional_source_root(
12✔
235
    source_root_request: SourceRootRequest, source_root_config: SourceRootConfig
236
) -> OptionalSourceRoot:
237
    """Rule to request a SourceRoot that may not exist."""
238
    pattern_matcher = source_root_config.get_pattern_matcher()
12✔
239
    path = source_root_request.path
12✔
240

241
    # Check if the requested path itself is a source root.
242

243
    # A) Does it match a pattern?
244
    if pattern_matcher.matches_root_patterns(path):
12✔
245
        return OptionalSourceRoot(SourceRoot(str(path)))
12✔
246

247
    # B) Does it contain a marker file?
248
    marker_filenames = source_root_config.marker_filenames
12✔
249
    if marker_filenames:
12✔
250
        for marker_filename in marker_filenames:
2✔
251
            if (
2✔
252
                os.path.basename(marker_filename) != marker_filename
253
                or "*" in marker_filename
254
                or "!" in marker_filename
255
            ):
256
                raise InvalidMarkerFileError(
×
257
                    f"Marker filename must be a base name: {marker_filename}"
258
                )
259
        paths = await path_globs_to_paths(PathGlobs([str(path / mf) for mf in marker_filenames]))
2✔
260
        if len(paths.files) > 0:
2✔
261
            return OptionalSourceRoot(SourceRoot(str(path)))
2✔
262

263
    # The requested path itself is not a source root, but maybe its parent is.
264
    if str(path) != ".":
12✔
265
        return await get_optional_source_root(SourceRootRequest(path.parent), **implicitly())
12✔
266

267
    # The requested path is not under a source root.
268
    return OptionalSourceRoot(None)
2✔
269

270

271
@rule
12✔
272
async def get_optional_source_roots(
12✔
273
    source_roots_request: SourceRootsRequest,
274
) -> OptionalSourceRootsResult:
275
    """Rule to request source roots that may not exist."""
276
    # A file cannot be a source root, so request for its parent.
277
    # In the typical case, where we have multiple files with the same parent, this can
278
    # dramatically cut down on the number of engine requests.
279
    dirs: set[PurePath] = set(source_roots_request.dirs)
12✔
280
    file_to_dir: dict[PurePath, PurePath] = {
12✔
281
        file: file.parent for file in source_roots_request.files
282
    }
283
    dirs.update(file_to_dir.values())
12✔
284

285
    roots = await concurrently(
12✔
286
        get_optional_source_root(SourceRootRequest(d), **implicitly()) for d in dirs
287
    )
288
    dir_to_root = dict(zip(dirs, roots))
12✔
289

290
    path_to_optional_root: dict[PurePath, OptionalSourceRoot] = {}
12✔
291
    for d in source_roots_request.dirs:
12✔
292
        path_to_optional_root[d] = dir_to_root[d]
4✔
293
    for f, d in file_to_dir.items():
12✔
294
        path_to_optional_root[f] = dir_to_root[d]
12✔
295

296
    return OptionalSourceRootsResult(path_to_optional_root=FrozenDict(path_to_optional_root))
12✔
297

298

299
@rule
12✔
300
async def get_source_roots(source_roots_request: SourceRootsRequest) -> SourceRootsResult:
12✔
301
    """Convenience rule to allow callers to request SourceRoots that must exist.
302

303
    That way callers don't have to unpack OptionalSourceRoots if they know they expect a SourceRoot
304
    to exist and are willing to error if it doesn't.
305
    """
306
    osrr = await get_optional_source_roots(source_roots_request)
12✔
307
    path_to_root = {}
12✔
308
    for path, osr in osrr.path_to_optional_root.items():
12✔
309
        if osr.source_root is None:
12✔
310
            raise NoSourceRootError(path)
1✔
311
        path_to_root[path] = osr.source_root
12✔
312
    return SourceRootsResult(path_to_root=FrozenDict(path_to_root))
12✔
313

314

315
@rule
12✔
316
async def get_source_root(source_root_request: SourceRootRequest) -> SourceRoot:
12✔
317
    """Convenience rule to allow callers to request a SourceRoot directly.
318

319
    That way callers don't have to unpack an OptionalSourceRoot if they know they expect a
320
    SourceRoot to exist and are willing to error if it doesn't.
321
    """
322
    optional_source_root = await get_optional_source_root(source_root_request, **implicitly())
12✔
323
    if optional_source_root.source_root is None:
12✔
324
        raise NoSourceRootError(source_root_request.path)
1✔
325
    return optional_source_root.source_root
12✔
326

327

328
class AllSourceRoots(DeduplicatedCollection[SourceRoot]):
12✔
329
    sort_input = True
12✔
330

331

332
@rule(desc="Compute all source roots", level=LogLevel.DEBUG)
12✔
333
async def all_roots(source_root_config: SourceRootConfig) -> AllSourceRoots:
12✔
334
    source_root_pattern_matcher = source_root_config.get_pattern_matcher()
2✔
335

336
    # Create globs corresponding to all source root patterns.
337
    pattern_matches: set[str] = set()
2✔
338
    for path in source_root_pattern_matcher.get_patterns():
2✔
339
        if path == "/":
2✔
340
            pattern_matches.add("**")
2✔
341
        elif path.startswith("/"):
2✔
342
            pattern_matches.add(f"{path[1:]}/")
×
343
        else:
344
            pattern_matches.add(f"**/{path}/")
2✔
345

346
    # Create globs for any marker files.
347
    marker_file_matches: set[str] = set()
2✔
348
    for marker_filename in source_root_config.marker_filenames:
2✔
349
        marker_file_matches.add(f"**/{marker_filename}")
1✔
350

351
    # Match the patterns against actual files, to find the roots that actually exist.
352
    pattern_paths, marker_paths = await concurrently(
2✔
353
        path_globs_to_paths(PathGlobs(globs=sorted(pattern_matches))),
354
        path_globs_to_paths(PathGlobs(globs=sorted(marker_file_matches))),
355
    )
356

357
    responses = await concurrently(
2✔
358
        itertools.chain(
359
            (
360
                get_optional_source_root(SourceRootRequest(PurePath(d)), **implicitly())
361
                for d in pattern_paths.dirs
362
            ),
363
            # We don't technically need to issue a SourceRootRequest for the marker files,
364
            # since we know that their immediately enclosing dir is a source root by definition.
365
            # However we may as well verify this formally, so that we're not replicating that
366
            # logic here.
367
            (
368
                get_optional_source_root(SourceRootRequest(PurePath(f)), **implicitly())
369
                for f in marker_paths.files
370
            ),
371
        )
372
    )
373
    all_source_roots = {
2✔
374
        response.source_root for response in responses if response.source_root is not None
375
    }
376
    return AllSourceRoots(all_source_roots)
2✔
377

378

379
def rules():
12✔
380
    return collect_rules()
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc