• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21803785359

08 Feb 2026 07:13PM UTC coverage: 43.3% (-37.0%) from 80.277%
21803785359

Pull #23085

github

web-flow
Merge 7c1cd926d into 40389cc58
Pull Request #23085: A helper method for indexing paths by source root

2 of 6 new or added lines in 1 file covered. (33.33%)

17114 existing lines in 539 files now uncovered.

26075 of 60219 relevant lines covered (43.3%)

0.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.43
/src/python/pants/source/source_root.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
from collections import defaultdict
1✔
7
import itertools
1✔
8
import logging
1✔
9
import os
1✔
10
from collections.abc import Iterable
1✔
11
from dataclasses import dataclass
1✔
12
from pathlib import PurePath
1✔
13

14
from pants.build_graph.address import Address
1✔
15
from pants.engine.collection import DeduplicatedCollection
1✔
16
from pants.engine.engine_aware import EngineAwareParameter
1✔
17
from pants.engine.fs import PathGlobs
1✔
18
from pants.engine.intrinsics import path_globs_to_paths
1✔
19
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
1✔
20
from pants.engine.target import Target
1✔
21
from pants.option.option_types import StrListOption
1✔
22
from pants.option.subsystem import Subsystem
1✔
23
from pants.util.docutil import doc_url
1✔
24
from pants.util.frozendict import FrozenDict
1✔
25
from pants.util.logging import LogLevel
1✔
26
from pants.util.memo import memoized_method
1✔
27
from pants.util.strutil import softwrap
1✔
28

29
logger = logging.getLogger(__name__)
1✔
30

31

32
@dataclass(frozen=True, order=True)
1✔
33
class SourceRoot:
1✔
34
    # Relative path from the buildroot.  Note that a source root at the buildroot
35
    # is represented as ".".
36
    path: str
1✔
37

38

39
@dataclass(frozen=True)
1✔
40
class OptionalSourceRoot:
1✔
41
    source_root: SourceRoot | None
1✔
42

43

44
class SourceRootError(Exception):
1✔
45
    """An error related to SourceRoot computation."""
46

47
    def __init__(self, msg: str):
1✔
48
        super().__init__(
×
49
            f"{msg}See {doc_url('docs/using-pants/key-concepts/source-roots')} for how to define source roots."
50
        )
51

52

53
class InvalidSourceRootPatternError(SourceRootError):
1✔
54
    """Indicates an invalid pattern was provided."""
55

56

57
class InvalidMarkerFileError(SourceRootError):
1✔
58
    """Indicates an invalid marker file was provided."""
59

60

61
class NoSourceRootError(SourceRootError):
1✔
62
    """Indicates we failed to map a source file to a source root."""
63

64
    def __init__(self, path: str | PurePath, extra_msg: str = ""):
1✔
65
        super().__init__(f"No source root found for `{path}`. {extra_msg}")
×
66

67

68
# We perform pattern matching against absolute paths, where "/" represents the repo root.
69
_repo_root = PurePath(os.path.sep)
1✔
70

71

72
@dataclass(frozen=True)
1✔
73
class SourceRootPatternMatcher:
1✔
74
    root_patterns: tuple[str, ...]
1✔
75

76
    def __post_init__(self) -> None:
1✔
UNCOV
77
        for root_pattern in self.root_patterns:
×
UNCOV
78
            if ".." in root_pattern.split(os.path.sep):
×
79
                raise InvalidSourceRootPatternError(
×
80
                    f"`..` disallowed in source root pattern: {root_pattern}."
81
                )
82

83
    def get_patterns(self) -> tuple[str, ...]:
1✔
UNCOV
84
        return tuple(self.root_patterns)
×
85

86
    def matches_root_patterns(self, relpath: PurePath) -> bool:
1✔
87
        """Does this putative root match a pattern?"""
88
        # Note: This is currently O(n) where n is the number of patterns, which
89
        # we expect to be small.  We can optimize if it becomes necessary.
UNCOV
90
        putative_root = _repo_root / relpath
×
UNCOV
91
        for pattern in self.root_patterns:
×
UNCOV
92
            if putative_root.match(pattern):
×
UNCOV
93
                return True
×
UNCOV
94
        return False
×
95

96

97
class SourceRootConfig(Subsystem):
1✔
98
    options_scope = "source"
1✔
99
    help = "Configuration for roots of source trees."
1✔
100

101
    DEFAULT_ROOT_PATTERNS = [
1✔
102
        "/",
103
        "src",
104
        "src/python",
105
        "src/py",
106
        "src/thrift",
107
        "src/protobuf",
108
        "src/protos",
109
        "src/scala",
110
        "src/java",
111
    ]
112

113
    root_patterns = StrListOption(
1✔
114
        default=DEFAULT_ROOT_PATTERNS,
115
        help=softwrap(
116
            f"""
117
            A list of source root suffixes.
118

119
            A directory with this suffix will be considered a potential source root.
120
            E.g., `src/python` will match `<buildroot>/src/python`, `<buildroot>/project1/src/python`
121
            etc.
122

123
            Prepend a `/` to anchor the match at the buildroot.
124
            E.g., `/src/python` will match `<buildroot>/src/python` but not `<buildroot>/project1/src/python`.
125

126
            A `*` wildcard will match a single path segment,
127
            E.g., `src/*` will match `<buildroot>/src/python` and `<buildroot>/src/rust`.
128

129
            Use `/` to signify that the buildroot itself is a source root.
130

131
            See {doc_url("docs/using-pants/key-concepts/source-roots")}.
132
            """
133
        ),
134
        advanced=True,
135
        metavar='["pattern1", "pattern2", ...]',
136
    )
137
    marker_filenames = StrListOption(
1✔
138
        help=softwrap(
139
            """
140
            The presence of a file of this name in a directory indicates that the directory
141
            is a source root. The content of the file doesn't matter, and may be empty.
142
            Useful when you can't or don't wish to centrally enumerate source roots via
143
            `root_patterns`.
144
            """
145
        ),
146
        advanced=True,
147
        metavar="filename",
148
    )
149

150
    @memoized_method
1✔
151
    def get_pattern_matcher(self) -> SourceRootPatternMatcher:
1✔
UNCOV
152
        return SourceRootPatternMatcher(self.root_patterns)
×
153

154

155
@dataclass(frozen=True)
1✔
156
class SourceRootsRequest:
1✔
157
    """Find the source roots for the given files and/or dirs."""
158

159
    files: tuple[PurePath, ...]
1✔
160
    dirs: tuple[PurePath, ...]
1✔
161

162
    def __init__(self, files: Iterable[PurePath], dirs: Iterable[PurePath]) -> None:
1✔
UNCOV
163
        object.__setattr__(self, "files", tuple(sorted(files)))
×
UNCOV
164
        object.__setattr__(self, "dirs", tuple(sorted(dirs)))
×
165

UNCOV
166
        self.__post_init__()
×
167

168
    def __post_init__(self) -> None:
1✔
UNCOV
169
        for path in itertools.chain(self.files, self.dirs):
×
UNCOV
170
            if ".." in str(path).split(os.path.sep):
×
171
                raise ValueError(f"SourceRootRequest cannot contain `..` segment: {path}")
×
UNCOV
172
            if path.is_absolute():
×
173
                raise ValueError(f"SourceRootRequest path must be relative: {path}")
×
174

175
    @classmethod
1✔
176
    def for_files(cls, file_paths: Iterable[str]) -> SourceRootsRequest:
1✔
177
        """Create a request for the source root for the given files."""
178
        return cls({PurePath(file_path) for file_path in file_paths}, ())
×
179

180

181
@dataclass(frozen=True)
1✔
182
class SourceRootRequest(EngineAwareParameter):
1✔
183
    """Find the source root for the given path.
184

185
    If you have multiple paths, particularly if many of them share parent directories, you'll get
186
    better performance with a `SourceRootsRequest` (see above) instead.
187
    """
188

189
    path: PurePath
1✔
190

191
    def __post_init__(self) -> None:
1✔
UNCOV
192
        if ".." in str(self.path).split(os.path.sep):
×
UNCOV
193
            raise ValueError(f"SourceRootRequest cannot contain `..` segment: {self.path}")
×
UNCOV
194
        if self.path.is_absolute():
×
195
            raise ValueError(f"SourceRootRequest path must be relative: {self.path}")
×
196

197
    @classmethod
1✔
198
    def for_file(cls, file_path: str) -> SourceRootRequest:
1✔
199
        """Create a request for the source root for the given file."""
200
        # The file itself cannot be a source root, so we may as well start the search
201
        # from its enclosing directory, and save on some superfluous checking.
202
        return cls(PurePath(file_path).parent)
×
203

204
    @classmethod
1✔
205
    def for_address(cls, address: Address) -> SourceRootRequest:
1✔
206
        # Note that we don't use for_file() here because the spec_path is a directory.
207
        return cls(PurePath(address.spec_path))
×
208

209
    @classmethod
1✔
210
    def for_target(cls, target: Target) -> SourceRootRequest:
1✔
211
        return cls.for_address(target.address)
×
212

213
    def debug_hint(self) -> str:
1✔
214
        return str(self.path)
×
215

216

217
@dataclass(frozen=True)
1✔
218
class SourceRootsResult:
1✔
219
    path_to_root: FrozenDict[PurePath, SourceRoot]
1✔
220

221
    def root_to_paths(self) -> FrozenDict[SourceRoot, tuple[PurePath, ...]]:
1✔
NEW
222
        res = defaultdict(list)
×
NEW
223
        for path, root in self.path_to_root:
×
NEW
224
            res[root].append(path)
×
NEW
225
        return FrozenDict((k, tuple(sorted(v))) for k, v in res)
×
226

227

228
@dataclass(frozen=True)
1✔
229
class OptionalSourceRootsResult:
1✔
230
    path_to_optional_root: FrozenDict[PurePath, OptionalSourceRoot]
1✔
231

232

233
@rule
1✔
234
async def get_optional_source_root(
1✔
235
    source_root_request: SourceRootRequest, source_root_config: SourceRootConfig
236
) -> OptionalSourceRoot:
237
    """Rule to request a SourceRoot that may not exist."""
UNCOV
238
    pattern_matcher = source_root_config.get_pattern_matcher()
×
UNCOV
239
    path = source_root_request.path
×
240

241
    # Check if the requested path itself is a source root.
242

243
    # A) Does it match a pattern?
UNCOV
244
    if pattern_matcher.matches_root_patterns(path):
×
UNCOV
245
        return OptionalSourceRoot(SourceRoot(str(path)))
×
246

247
    # B) Does it contain a marker file?
UNCOV
248
    marker_filenames = source_root_config.marker_filenames
×
UNCOV
249
    if marker_filenames:
×
UNCOV
250
        for marker_filename in marker_filenames:
×
UNCOV
251
            if (
×
252
                os.path.basename(marker_filename) != marker_filename
253
                or "*" in marker_filename
254
                or "!" in marker_filename
255
            ):
256
                raise InvalidMarkerFileError(
×
257
                    f"Marker filename must be a base name: {marker_filename}"
258
                )
UNCOV
259
        paths = await path_globs_to_paths(PathGlobs([str(path / mf) for mf in marker_filenames]))
×
UNCOV
260
        if len(paths.files) > 0:
×
UNCOV
261
            return OptionalSourceRoot(SourceRoot(str(path)))
×
262

263
    # The requested path itself is not a source root, but maybe its parent is.
UNCOV
264
    if str(path) != ".":
×
UNCOV
265
        return await get_optional_source_root(SourceRootRequest(path.parent), **implicitly())
×
266

267
    # The requested path is not under a source root.
UNCOV
268
    return OptionalSourceRoot(None)
×
269

270

271
@rule
1✔
272
async def get_optional_source_roots(
1✔
273
    source_roots_request: SourceRootsRequest,
274
) -> OptionalSourceRootsResult:
275
    """Rule to request source roots that may not exist."""
276
    # A file cannot be a source root, so request for its parent.
277
    # In the typical case, where we have multiple files with the same parent, this can
278
    # dramatically cut down on the number of engine requests.
279
    dirs: set[PurePath] = set(source_roots_request.dirs)
×
280
    file_to_dir: dict[PurePath, PurePath] = {
×
281
        file: file.parent for file in source_roots_request.files
282
    }
283
    dirs.update(file_to_dir.values())
×
284

285
    roots = await concurrently(
×
286
        get_optional_source_root(SourceRootRequest(d), **implicitly()) for d in dirs
287
    )
288
    dir_to_root = dict(zip(dirs, roots))
×
289

290
    path_to_optional_root: dict[PurePath, OptionalSourceRoot] = {}
×
291
    for d in source_roots_request.dirs:
×
292
        path_to_optional_root[d] = dir_to_root[d]
×
293
    for f, d in file_to_dir.items():
×
294
        path_to_optional_root[f] = dir_to_root[d]
×
295

296
    return OptionalSourceRootsResult(path_to_optional_root=FrozenDict(path_to_optional_root))
×
297

298

299
@rule
1✔
300
async def get_source_roots(source_roots_request: SourceRootsRequest) -> SourceRootsResult:
1✔
301
    """Convenience rule to allow callers to request SourceRoots that must exist.
302

303
    That way callers don't have to unpack OptionalSourceRoots if they know they expect a SourceRoot
304
    to exist and are willing to error if it doesn't.
305
    """
306
    osrr = await get_optional_source_roots(source_roots_request)
×
307
    path_to_root = {}
×
308
    for path, osr in osrr.path_to_optional_root.items():
×
309
        if osr.source_root is None:
×
310
            raise NoSourceRootError(path)
×
311
        path_to_root[path] = osr.source_root
×
312
    return SourceRootsResult(path_to_root=FrozenDict(path_to_root))
×
313

314

315
@rule
1✔
316
async def get_source_root(source_root_request: SourceRootRequest) -> SourceRoot:
1✔
317
    """Convenience rule to allow callers to request a SourceRoot directly.
318

319
    That way callers don't have to unpack an OptionalSourceRoot if they know they expect a
320
    SourceRoot to exist and are willing to error if it doesn't.
321
    """
322
    optional_source_root = await get_optional_source_root(source_root_request, **implicitly())
×
323
    if optional_source_root.source_root is None:
×
324
        raise NoSourceRootError(source_root_request.path)
×
325
    return optional_source_root.source_root
×
326

327

328
class AllSourceRoots(DeduplicatedCollection[SourceRoot]):
1✔
329
    sort_input = True
1✔
330

331

332
@rule(desc="Compute all source roots", level=LogLevel.DEBUG)
1✔
333
async def all_roots(source_root_config: SourceRootConfig) -> AllSourceRoots:
1✔
UNCOV
334
    source_root_pattern_matcher = source_root_config.get_pattern_matcher()
×
335

336
    # Create globs corresponding to all source root patterns.
UNCOV
337
    pattern_matches: set[str] = set()
×
UNCOV
338
    for path in source_root_pattern_matcher.get_patterns():
×
UNCOV
339
        if path == "/":
×
UNCOV
340
            pattern_matches.add("**")
×
UNCOV
341
        elif path.startswith("/"):
×
342
            pattern_matches.add(f"{path[1:]}/")
×
343
        else:
UNCOV
344
            pattern_matches.add(f"**/{path}/")
×
345

346
    # Create globs for any marker files.
UNCOV
347
    marker_file_matches: set[str] = set()
×
UNCOV
348
    for marker_filename in source_root_config.marker_filenames:
×
349
        marker_file_matches.add(f"**/{marker_filename}")
×
350

351
    # Match the patterns against actual files, to find the roots that actually exist.
UNCOV
352
    pattern_paths, marker_paths = await concurrently(
×
353
        path_globs_to_paths(PathGlobs(globs=sorted(pattern_matches))),
354
        path_globs_to_paths(PathGlobs(globs=sorted(marker_file_matches))),
355
    )
356

UNCOV
357
    responses = await concurrently(
×
358
        itertools.chain(
359
            (
360
                get_optional_source_root(SourceRootRequest(PurePath(d)), **implicitly())
361
                for d in pattern_paths.dirs
362
            ),
363
            # We don't technically need to issue a SourceRootRequest for the marker files,
364
            # since we know that their immediately enclosing dir is a source root by definition.
365
            # However we may as well verify this formally, so that we're not replicating that
366
            # logic here.
367
            (
368
                get_optional_source_root(SourceRootRequest(PurePath(f)), **implicitly())
369
                for f in marker_paths.files
370
            ),
371
        )
372
    )
UNCOV
373
    all_source_roots = {
×
374
        response.source_root for response in responses if response.source_root is not None
375
    }
UNCOV
376
    return AllSourceRoots(all_source_roots)
×
377

378

379
def rules():
1✔
380
    return collect_rules()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc