• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25441711719

06 May 2026 02:31PM UTC coverage: 92.915%. Remained the same
25441711719

push

github

web-flow
use sha pin (with comment) format for generated actions (#23312)

Per the GitHub Action best practices we recently enabled at #23249, we
should pin each action to a SHA so that the reference is actually
immutable.

This will -- I hope -- knock out a large chunk of the 421 alerts we
currently get from zizmor. The next followup would then be upgrades and
harmonizing the generated and none-generated pins.

Notice: This idea was suggested by Claude while going over pinact output
and I was surprised to see that post processing the yaml wasn't too
gross.

92206 of 99237 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/src/python/pants/testutil/rule_runner.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import atexit
12✔
7
import dataclasses
12✔
8
import difflib
12✔
9
import functools
12✔
10
import inspect
12✔
11
import os
12✔
12
import re
12✔
13
import sys
12✔
14
import warnings
12✔
15
from collections.abc import Callable, Coroutine, Generator, Iterable, Iterator, Mapping, Sequence
12✔
16
from contextlib import contextmanager
12✔
17
from dataclasses import dataclass
12✔
18
from io import StringIO
12✔
19
from pathlib import Path, PurePath
12✔
20
from tempfile import mkdtemp
12✔
21
from typing import Any, TypeVar, cast, overload
12✔
22

23
from pants.base.build_environment import get_buildroot
12✔
24
from pants.base.build_root import BuildRoot
12✔
25
from pants.base.specs_parser import SpecsParser
12✔
26
from pants.build_graph import build_configuration
12✔
27
from pants.build_graph.build_configuration import BuildConfiguration
12✔
28
from pants.build_graph.build_file_aliases import BuildFileAliases
12✔
29
from pants.core.goals.run import generate_run_request
12✔
30
from pants.core.util_rules import adhoc_binaries, misc
12✔
31
from pants.engine.addresses import Address
12✔
32
from pants.engine.console import Console
12✔
33
from pants.engine.env_vars import CompleteEnvironmentVars
12✔
34
from pants.engine.environment import EnvironmentName
12✔
35
from pants.engine.fs import CreateDigest, Digest, FileContent, Snapshot, Workspace
12✔
36
from pants.engine.goal import CurrentExecutingGoals, Goal
12✔
37
from pants.engine.internals import native_engine, options_parsing
12✔
38
from pants.engine.internals.native_engine import (
12✔
39
    ProcessExecutionEnvironment,
40
    PyExecutor,
41
    _Concurrently,
42
)
43
from pants.engine.internals.scheduler import ExecutionError, Scheduler, SchedulerSession
12✔
44
from pants.engine.internals.selectors import Call, Params
12✔
45
from pants.engine.internals.session import SessionValues
12✔
46
from pants.engine.platform import Platform
12✔
47
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
12✔
48
from pants.engine.rules import QueryRule as QueryRule
12✔
49
from pants.engine.target import AllTargets, Target, WrappedTarget, WrappedTargetRequest
12✔
50
from pants.engine.unions import UnionMembership, UnionRule
12✔
51
from pants.goal.auxiliary_goal import AuxiliaryGoal
12✔
52
from pants.init.engine_initializer import EngineInitializer
12✔
53
from pants.init.logging import initialize_stdio, initialize_stdio_raw, stdio_destination
12✔
54
from pants.option.bootstrap_options import DynamicRemoteOptions, ExecutionOptions, LocalStoreOptions
12✔
55
from pants.option.global_options import GlobalOptions
12✔
56
from pants.option.options_bootstrapper import OptionsBootstrapper
12✔
57
from pants.source import source_root
12✔
58
from pants.testutil.option_util import create_options_bootstrapper
12✔
59
from pants.util.collections import assert_single_element
12✔
60
from pants.util.contextutil import pushd, temporary_dir, temporary_file
12✔
61
from pants.util.dirutil import recursive_dirname, safe_mkdir, safe_mkdtemp, safe_open
12✔
62
from pants.util.logging import LogLevel
12✔
63
from pants.util.ordered_set import OrderedSet
12✔
64
from pants.util.strutil import softwrap
12✔
65

66

67
def logging(original_function=None, *, level: LogLevel = LogLevel.INFO):
12✔
68
    """A decorator that enables logging (optionally at the given level).
69

70
    May be used without a parameter list:
71

72
        ```
73
        @logging
74
        def test_function():
75
            ...
76
        ```
77

78
    ...or with a level argument:
79

80
        ```
81
        @logging(level=LogLevel.DEBUG)
82
        def test_function():
83
            ...
84
        ```
85
    """
86

87
    def _decorate(func):
9✔
88
        @functools.wraps(func)
9✔
89
        def wrapper(*args, **kwargs):
9✔
90
            stdout_fileno, stderr_fileno = sys.stdout.fileno(), sys.stderr.fileno()
9✔
91
            with (
9✔
92
                temporary_dir() as tempdir,
93
                initialize_stdio_raw(level, False, False, {}, True, [], tempdir),
94
                stdin_context() as stdin,
95
                stdio_destination(stdin.fileno(), stdout_fileno, stderr_fileno),
96
            ):
97
                return func(*args, **kwargs)
9✔
98

99
        return wrapper
9✔
100

101
    if original_function:
9✔
102
        return _decorate(original_function)
8✔
103
    return _decorate
1✔
104

105

106
@contextmanager
12✔
107
def engine_error(
12✔
108
    expected_underlying_exception: type[Exception] = Exception,
109
    *,
110
    contains: str | None = None,
111
    normalize_tracebacks: bool = False,
112
) -> Iterator[None]:
113
    """A context manager to catch `ExecutionError`s in tests and check that the underlying exception
114
    is expected.
115

116
    Use like this:
117

118
        with engine_error(ValueError, contains="foo"):
119
            rule_runner.request(OutputType, [input])
120

121
    Will raise AssertionError if no ExecutionError occurred.
122

123
    Set `normalize_tracebacks=True` to replace file locations and addresses in the error message
124
    with fixed values for testability, and check `contains` against the `ExecutionError` message
125
    instead of the underlying error only.
126
    """
127
    try:
10✔
128
        yield
10✔
129
    except ExecutionError as exec_error:
10✔
130
        if not len(exec_error.wrapped_exceptions) == 1:
10✔
131
            formatted_errors = "\n\n".join(repr(e) for e in exec_error.wrapped_exceptions)
×
132
            raise ValueError(
×
133
                softwrap(
134
                    f"""
135
                    Multiple underlying exceptions, but this helper function expected only one.
136
                    Use `with pytest.raises(ExecutionError) as exc` directly and inspect
137
                    `exc.value.wrapped_exceptions`.
138

139
                    Errors: {formatted_errors}
140
                    """
141
                )
142
            )
143
        underlying = exec_error.wrapped_exceptions[0]
10✔
144
        if not isinstance(underlying, expected_underlying_exception):
10✔
145
            raise AssertionError(
×
146
                softwrap(
147
                    f"""
148
                    ExecutionError occurred as expected, but the underlying exception had type
149
                    {type(underlying)} rather than the expected type
150
                    {expected_underlying_exception}:
151

152
                    {underlying}
153
                    """
154
                )
155
            )
156
        if contains is not None:
10✔
157
            if normalize_tracebacks:
9✔
158
                errmsg = remove_locations_from_traceback(str(exec_error))
1✔
159
            else:
160
                errmsg = str(underlying)
9✔
161
            if contains not in errmsg:
9✔
162
                diff = "\n".join(
×
163
                    difflib.Differ().compare(contains.splitlines(), errmsg.splitlines())
164
                )
165
                raise AssertionError(
×
166
                    softwrap(
167
                        f"""
168
                        Expected value not found in exception.
169

170
                        => Expected: {contains}
171

172
                        => Actual: {errmsg}
173

174
                        => Diff: {diff}
175
                        """
176
                    )
177
                )
178
    else:
179
        raise AssertionError(
1✔
180
            softwrap(
181
                f"""
182
                DID NOT RAISE ExecutionError with underlying exception type
183
                {expected_underlying_exception}.
184
                """
185
            )
186
        )
187

188

189
def remove_locations_from_traceback(trace: str) -> str:
12✔
190
    location_pattern = re.compile(r'"/.*", line \d+')
1✔
191
    address_pattern = re.compile(r"0x[0-9a-f]+")
1✔
192
    new_trace = location_pattern.sub("LOCATION-INFO", trace)
1✔
193
    new_trace = address_pattern.sub("0xEEEEEEEEE", new_trace)
1✔
194
    return new_trace
1✔
195

196

197
# -----------------------------------------------------------------------------------------------
198
# `RuleRunner`
199
# -----------------------------------------------------------------------------------------------
200

201

202
_I = TypeVar("_I")
12✔
203
_O = TypeVar("_O")
12✔
204

205

206
# A global executor for Schedulers created in unit tests, which is shutdown using `atexit`. This
207
# allows for reusing threads, and avoids waiting for straggling tasks during teardown of each test.
208
EXECUTOR = PyExecutor(
12✔
209
    # Use the ~minimum possible parallelism since integration tests using RuleRunner will already
210
    # be run by Pants using an appropriate Parallelism. We must set max_threads > core_threads; so
211
    # 2 is the minimum, but, via trial and error, 3 minimizes test times on average.
212
    core_threads=1,
213
    max_threads=3,
214
)
215
atexit.register(lambda: EXECUTOR.shutdown(5))
12✔
216

217

218
# Environment variable names required for locating Python interpreters, for use with RuleRunner's
219
# env_inherit arguments.
220
# TODO: This is verbose and redundant: see https://github.com/pantsbuild/pants/issues/13350.
221
PYTHON_BOOTSTRAP_ENV = {"PATH", "PYENV_ROOT", "HOME"}
12✔
222

223

224
@dataclass(frozen=True)
12✔
225
class GoalRuleResult:
12✔
226
    exit_code: int
12✔
227
    stdout: str
12✔
228
    stderr: str
12✔
229

230
    @staticmethod
12✔
231
    def noop() -> GoalRuleResult:
12✔
232
        return GoalRuleResult(0, stdout="", stderr="")
1✔
233

234

235
# This is not frozen because we need to update the `scheduler` when setting options.
236
@dataclass
12✔
237
class RuleRunner:
12✔
238
    build_root: str
12✔
239
    options_bootstrapper: OptionsBootstrapper
12✔
240
    extra_session_values: dict[Any, Any]
12✔
241
    max_workunit_verbosity: LogLevel
12✔
242
    build_config: BuildConfiguration
12✔
243
    scheduler: SchedulerSession
12✔
244
    rules: tuple[Any, ...]
12✔
245

246
    def __init__(
12✔
247
        self,
248
        *,
249
        rules: Iterable | None = None,
250
        target_types: Iterable[type[Target]] | None = None,
251
        objects: dict[str, Any] | None = None,
252
        aliases: Iterable[BuildFileAliases] | None = None,
253
        context_aware_object_factories: dict[str, Any] | None = None,
254
        isolated_local_store: bool = False,
255
        preserve_tmpdirs: bool = False,
256
        ca_certs_path: str | None = None,
257
        bootstrap_args: Iterable[str] = (),
258
        extra_session_values: dict[Any, Any] | None = None,
259
        max_workunit_verbosity: LogLevel = LogLevel.DEBUG,
260
        inherent_environment: EnvironmentName | None = EnvironmentName(None),
261
        is_bootstrap: bool = False,
262
        auxiliary_goals: Iterable[type[AuxiliaryGoal]] | None = None,
263
    ) -> None:
264
        bootstrap_args = [*bootstrap_args]
12✔
265

266
        root_dir: Path | None = None
12✔
267
        if preserve_tmpdirs:
12✔
268
            root_dir = Path(mkdtemp(prefix="RuleRunner."))
8✔
269
            print(f"Preserving rule runner temporary directories at {root_dir}.", file=sys.stderr)
8✔
270
            bootstrap_args.extend(
8✔
271
                ["--keep-sandboxes=always", f"--local-execution-root-dir={root_dir}"]
272
            )
273
            build_root = (root_dir / "BUILD_ROOT").resolve()
8✔
274
            build_root.mkdir()
8✔
275
            self.build_root = str(build_root)
8✔
276
        else:
277
            self.build_root = os.path.realpath(safe_mkdtemp(prefix="_BUILD_ROOT"))
12✔
278

279
        safe_mkdir(self.pants_workdir)
12✔
280
        BuildRoot().path = self.build_root
12✔
281

282
        def rewrite_rule_for_inherent_environment(rule):
12✔
283
            if not inherent_environment or not isinstance(rule, QueryRule):
12✔
284
                return rule
12✔
285
            return QueryRule(rule.output_type, OrderedSet((*rule.input_types, EnvironmentName)))
12✔
286

287
        # TODO: Redesign rule registration for tests to be more ergonomic and to make this less
288
        #  special-cased.
289
        self.rules = tuple(rewrite_rule_for_inherent_environment(rule) for rule in (rules or ()))
12✔
290
        all_rules = (
12✔
291
            *self.rules,
292
            *build_configuration.rules(),
293
            *source_root.rules(),
294
            *options_parsing.rules(),
295
            *misc.rules(),
296
            *adhoc_binaries.rules(),
297
            # Many tests indirectly rely on this rule.
298
            generate_run_request,
299
            QueryRule(WrappedTarget, [WrappedTargetRequest]),
300
            QueryRule(AllTargets, []),
301
            QueryRule(UnionMembership, []),
302
        )
303
        build_config_builder = BuildConfiguration.Builder()
12✔
304
        build_config_builder.register_aliases(
12✔
305
            BuildFileAliases(
306
                objects=objects, context_aware_object_factories=context_aware_object_factories
307
            )
308
        )
309
        aliases = aliases or ()
12✔
310
        for build_file_aliases in aliases:
12✔
311
            build_config_builder.register_aliases(build_file_aliases)
1✔
312

313
        build_config_builder.register_rules("_dummy_for_test_", all_rules)
12✔
314
        build_config_builder.register_target_types("_dummy_for_test_", target_types or ())
12✔
315
        build_config_builder.register_auxiliary_goals("_dummy_for_test_", auxiliary_goals or ())
12✔
316
        self.build_config = build_config_builder.create()
12✔
317

318
        self.environment = CompleteEnvironmentVars({})
12✔
319
        self.extra_session_values = extra_session_values or {}
12✔
320
        self.inherent_environment = inherent_environment
12✔
321
        self.max_workunit_verbosity = max_workunit_verbosity
12✔
322

323
        # Change cwd and add sentinel file (BUILDROOT) so NativeOptionParser can find build_root.
324
        with self.pushd():
12✔
325
            Path("BUILDROOT").touch()
12✔
326
            self.options_bootstrapper = self.create_options_bootstrapper(
12✔
327
                args=bootstrap_args, env=None
328
            )
329
            options = self.options_bootstrapper.full_options(
12✔
330
                known_scope_infos=self.build_config.known_scope_infos,
331
                union_membership=UnionMembership.from_rules(
332
                    rule for rule in self.rules if isinstance(rule, UnionRule)
333
                ),
334
                allow_unknown_options=self.build_config.allow_unknown_options,
335
            )
336
            global_options = self.options_bootstrapper.bootstrap_options.for_global_scope()
12✔
337

338
        dynamic_remote_options, _ = DynamicRemoteOptions.from_options(options, self.environment)
12✔
339
        local_store_options = LocalStoreOptions.from_options(global_options)
12✔
340
        if isolated_local_store:
12✔
341
            if root_dir:
4✔
342
                lmdb_store_dir = root_dir / "lmdb_store"
×
343
                lmdb_store_dir.mkdir()
×
344
                store_dir = str(lmdb_store_dir)
×
345
            else:
346
                store_dir = safe_mkdtemp(prefix="lmdb_store.")
4✔
347
            local_store_options = dataclasses.replace(local_store_options, store_dir=store_dir)
4✔
348

349
        local_execution_root_dir = global_options.local_execution_root_dir
12✔
350
        named_caches_dir = global_options.named_caches_dir
12✔
351

352
        self._set_new_session(
12✔
353
            EngineInitializer.setup_graph_extended(
354
                pants_ignore_patterns=GlobalOptions.compute_pants_ignore(
355
                    self.build_root, global_options
356
                ),
357
                use_gitignore=False,
358
                local_store_options=local_store_options,
359
                local_execution_root_dir=local_execution_root_dir,
360
                named_caches_dir=named_caches_dir,
361
                pants_workdir=self.pants_workdir,
362
                build_root=self.build_root,
363
                build_configuration=self.build_config,
364
                # Each Scheduler that is created borrows the global executor, which is shut down `atexit`.
365
                executor=EXECUTOR.to_borrowed(),
366
                execution_options=ExecutionOptions.from_options(
367
                    global_options, dynamic_remote_options
368
                ),
369
                ca_certs_path=ca_certs_path,
370
                engine_visualize_to=None,
371
                is_bootstrap=is_bootstrap,
372
            ).scheduler
373
        )
374

375
    def __repr__(self) -> str:
12✔
376
        return f"RuleRunner(build_root={self.build_root})"
1✔
377

378
    def _set_new_session(self, scheduler: Scheduler) -> None:
12✔
379
        self.scheduler = scheduler.new_session(
12✔
380
            build_id="buildid_for_test",
381
            session_values=SessionValues(
382
                {
383
                    OptionsBootstrapper: self.options_bootstrapper,
384
                    CompleteEnvironmentVars: self.environment,
385
                    CurrentExecutingGoals: CurrentExecutingGoals(),
386
                    **self.extra_session_values,
387
                }
388
            ),
389
            max_workunit_level=self.max_workunit_verbosity,
390
        )
391

392
    @property
12✔
393
    def pants_workdir(self) -> str:
12✔
394
        return os.path.join(self.build_root, ".pants.d", "workdir")
12✔
395

396
    @property
12✔
397
    def target_types(self) -> tuple[type[Target], ...]:
12✔
398
        return self.build_config.target_types
1✔
399

400
    @property
12✔
401
    def union_membership(self) -> UnionMembership:
12✔
402
        """An instance of `UnionMembership` with all the test's registered `UnionRule`s."""
403
        return self.request(UnionMembership, [])
12✔
404

405
    def new_session(self, build_id: str) -> None:
12✔
406
        """Mutates this RuleRunner to begin a new Session with the same Scheduler."""
407
        self.scheduler = self.scheduler.scheduler.new_session(build_id)
5✔
408

409
    def request(self, output_type: type[_O], inputs: Iterable[Any]) -> _O:
12✔
410
        params = (
12✔
411
            Params(*inputs, self.inherent_environment)
412
            if self.inherent_environment
413
            else Params(*inputs)
414
        )
415
        with self.pushd():
12✔
416
            result = assert_single_element(self.scheduler.product_request(output_type, params))
12✔
417
        return cast(_O, result)
12✔
418

419
    def run_goal_rule(
12✔
420
        self,
421
        goal: type[Goal],
422
        *,
423
        global_args: Iterable[str] | None = None,
424
        args: Iterable[str] | None = None,
425
        env: Mapping[str, str] | None = None,
426
        env_inherit: set[str] | None = None,
427
    ) -> GoalRuleResult:
428
        merged_args = (*(global_args or []), goal.name, *(args or []))
12✔
429
        self.set_options(merged_args, env=env, env_inherit=env_inherit)
12✔
430

431
        with self.pushd():
12✔
432
            raw_specs = self.options_bootstrapper.full_options_for_scopes(
12✔
433
                [GlobalOptions.get_scope_info(), goal.subsystem_cls.get_scope_info()],
434
                self.union_membership,
435
            ).specs
436
        specs = SpecsParser(root_dir=self.build_root).parse_specs(
12✔
437
            raw_specs, description_of_origin="RuleRunner.run_goal_rule()"
438
        )
439

440
        stdout, stderr = StringIO(), StringIO()
12✔
441
        console = Console(stdout=stdout, stderr=stderr, use_colors=False, session=self.scheduler)
12✔
442

443
        with self.pushd():
12✔
444
            exit_code = self.scheduler.run_goal_rule(
12✔
445
                goal,
446
                Params(
447
                    specs,
448
                    console,
449
                    Workspace(self.scheduler),
450
                    *([self.inherent_environment] if self.inherent_environment else []),
451
                ),
452
            )
453

454
        console.flush()
12✔
455
        return GoalRuleResult(exit_code, stdout.getvalue(), stderr.getvalue())
12✔
456

457
    @contextmanager
12✔
458
    def pushd(self):
12✔
459
        with pushd(self.build_root):
12✔
460
            yield
12✔
461

462
    def create_options_bootstrapper(
12✔
463
        self, args: Iterable[str], env: Mapping[str, str] | None
464
    ) -> OptionsBootstrapper:
465
        return create_options_bootstrapper(args=args, env=env)
12✔
466

467
    def set_options(
12✔
468
        self,
469
        args: Iterable[str],
470
        *,
471
        env: Mapping[str, str] | None = None,
472
        env_inherit: set[str] | None = None,
473
    ) -> None:
474
        """Update the engine session with new options and/or environment variables.
475

476
        The environment variables will be used to set the `CompleteEnvironmentVars`, which is the
477
        environment variables captured by the parent Pants process. Some rules use this to be able
478
        to read arbitrary env vars. Any options that start with `PANTS_` will also be used to set
479
        options.
480

481
        Environment variables listed in `env_inherit` and not in `env` will be inherited from the test
482
        runner's environment (os.environ)
483

484
        This will override any previously configured values.
485
        """
486
        env = {
12✔
487
            **{k: os.environ[k] for k in (env_inherit or set()) if k in os.environ},
488
            **(env or {}),
489
        }
490
        with self.pushd():
12✔
491
            self.options_bootstrapper = self.create_options_bootstrapper(args=args, env=env)
12✔
492
        self.environment = CompleteEnvironmentVars(env)
12✔
493
        self._set_new_session(self.scheduler.scheduler)
12✔
494

495
    def set_session_values(
12✔
496
        self,
497
        extra_session_values: dict[Any, Any],
498
    ) -> None:
499
        """Update the engine Session with new session_values."""
500
        self.extra_session_values = extra_session_values
4✔
501
        self._set_new_session(self.scheduler.scheduler)
4✔
502

503
    def _invalidate_for(self, *relpaths: str):
12✔
504
        """Invalidates all files from the relpath, recursively up to the root.
505

506
        Many python operations implicitly create parent directories, so we assume that touching a
507
        file located below directories that do not currently exist will result in their creation.
508
        """
509
        files = {f for relpath in relpaths for f in recursive_dirname(relpath)}
12✔
510
        return self.scheduler.invalidate_files(files)
12✔
511

512
    def chmod(self, relpath: str | PurePath, mode: int) -> None:
12✔
513
        """Change the file mode and permissions.
514

515
        relpath: The relative path to the file or directory from the build root.
516
        mode: The file mode to set, preferable in octal representation, e.g. `mode=0o750`.
517
        """
518
        Path(self.build_root, relpath).chmod(mode)
2✔
519
        self._invalidate_for(str(relpath))
2✔
520

521
    def create_dir(self, relpath: str) -> str:
12✔
522
        """Creates a directory under the buildroot.
523

524
        :API: public
525

526
        relpath: The relative path to the directory from the build root.
527
        """
528
        path = os.path.join(self.build_root, relpath)
3✔
529
        safe_mkdir(path)
3✔
530
        self._invalidate_for(relpath)
3✔
531
        return path
3✔
532

533
    def _create_file(
12✔
534
        self, relpath: str | PurePath, contents: bytes | str = "", mode: str = "w"
535
    ) -> str:
536
        """Writes to a file under the buildroot.
537

538
        relpath: The relative path to the file from the build root.
539
        contents: A string containing the contents of the file - '' by default..
540
        mode: The mode to write to the file in - over-write by default.
541
        """
542
        path = os.path.join(self.build_root, relpath)
12✔
543
        with safe_open(path, mode=mode) as fp:
12✔
544
            fp.write(contents)
12✔
545
        self._invalidate_for(str(relpath))
12✔
546
        return path
12✔
547

548
    @overload
549
    def write_files(self, files: Mapping[str, str | bytes]) -> tuple[str, ...]: ...
550

551
    @overload
552
    def write_files(self, files: Mapping[PurePath, str | bytes]) -> tuple[str, ...]: ...
553

554
    def write_files(
12✔
555
        self, files: Mapping[PurePath, str | bytes] | Mapping[str, str | bytes]
556
    ) -> tuple[str, ...]:
557
        """Write the files to the build root.
558

559
        :API: public
560

561
        files: A mapping of file names to contents.
562
        returns: A tuple of absolute file paths created.
563
        """
564
        paths = []
12✔
565
        for path, content in files.items():
12✔
566
            paths.append(
12✔
567
                self._create_file(path, content, mode="wb" if isinstance(content, bytes) else "w")
568
            )
569
        return tuple(paths)
12✔
570

571
    def read_file(self, file: str | PurePath, mode: str = "r") -> str | bytes:
12✔
572
        """Read a file that was written to the build root, useful for testing."""
573
        path = os.path.join(self.build_root, file)
3✔
574
        with safe_open(path, mode=mode) as fp:
3✔
575
            if "b" in mode:
3✔
576
                return bytes(fp.read())
×
577
            return str(fp.read())
3✔
578

579
    def make_snapshot(self, files: Mapping[str, str | bytes]) -> Snapshot:
12✔
580
        """Makes a snapshot from a map of file name to file content.
581

582
        :API: public
583
        """
584
        file_contents = [
12✔
585
            FileContent(path, content.encode() if isinstance(content, str) else content)
586
            for path, content in files.items()
587
        ]
588
        digest = self.request(Digest, [CreateDigest(file_contents)])
12✔
589
        return self.request(Snapshot, [digest])
12✔
590

591
    def make_snapshot_of_empty_files(self, files: Iterable[str]) -> Snapshot:
12✔
592
        """Makes a snapshot with empty content for each file.
593

594
        This is a convenience around `TestBase.make_snapshot`, which allows specifying the content
595
        for each file.
596

597
        :API: public
598
        """
599
        return self.make_snapshot(dict.fromkeys(files, ""))
3✔
600

601
    def get_target(self, address: Address) -> Target:
12✔
602
        """Find the target for a given address.
603

604
        This requires that the target actually exists, i.e. that you set up its BUILD file.
605

606
        :API: public
607
        """
608
        return self.request(
12✔
609
            WrappedTarget,
610
            [WrappedTargetRequest(address, description_of_origin="RuleRunner.get_target()")],
611
        ).target
612

613
    def write_digest(
12✔
614
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
615
    ) -> None:
616
        """Write a digest to disk, relative to the test's build root.
617

618
        Access the written files by using `os.path.join(rule_runner.build_root, <relpath>)`.
619
        """
620
        native_engine.write_digest(
8✔
621
            self.scheduler.py_scheduler,
622
            self.scheduler.py_session,
623
            digest,
624
            path_prefix or "",
625
            clear_paths,
626
        )
627

628
    def run_interactive_process(self, request: InteractiveProcess) -> InteractiveProcessResult:
12✔
629
        with self.pushd():
9✔
630
            return native_engine.session_run_interactive_process(
9✔
631
                self.scheduler.py_session,
632
                request,
633
                ProcessExecutionEnvironment(
634
                    environment_name=None,
635
                    platform=Platform.create_for_localhost().value,
636
                    docker_image=None,
637
                    remote_execution=False,
638
                    remote_execution_extra_platform_properties=[],
639
                    execute_in_workspace=False,
640
                    keep_sandboxes="never",
641
                ),
642
            )
643

644

645
# -----------------------------------------------------------------------------------------------
646
# `run_rule_with_mocks()`
647
# -----------------------------------------------------------------------------------------------
648

649

650
def run_rule_with_mocks(
12✔
651
    rule: Callable[..., Coroutine[Any, Any, _O]],
652
    *,
653
    rule_args: Sequence[Any] = (),
654
    mock_calls: Mapping[str, Callable] | None = None,
655
    union_membership: UnionMembership | None = None,
656
    show_warnings: bool = True,
657
) -> _O:
658
    """A test helper that runs an @rule with a set of args and mocked underlying @rule invocations.
659

660
    An @rule named `my_rule` that takes one argument and invokes no other @rules (by-name  or via
661
    `Get` requests) can be invoked like so:
662

663
    ```
664
    return_value = run_rule_with_mocks(my_rule, rule_args=[arg1])
665
    ```
666

667
    In the case of an @rule that invokes other @rules, either by name or via `Get` requests,
668
    the `mock_calls` argument must be provided.
669

670
    `mock_calls` is a mapping of fully-qualified rule name to the function that mocks that rule,
671
    and mocks out calls by name to the corresponding rules.
672

673
    So in the case of an @rule named `my_co_rule` that takes one argument and calls the @rule
674
    `path.to.module.list_dir` by name to produce a `Listing` from a `Dir`, the invoke might look
675
    like:
676

677
    ```
678
    return_value = run_rule_with_mocks(
679
      my_co_rule,
680
      rule_args=[arg1],
681
      mock_calls={
682
        "path.to.module.list_dir": lambda dir_subject: Listing(..),
683
      },
684
    )
685
    ```
686

687
    :returns: The return value of the completed @rule.
688
    """
689
    mock_calls = mock_calls or {}
12✔
690

691
    task_rule = getattr(rule, "rule", None)
12✔
692

693
    func: Callable[..., Coroutine[Any, Any, _O]] | Callable[..., _O]
694

695
    # Perform additional validation on `@rule` that the correct args are provided. We don't have
696
    # an easy way to do this for async helper calls yet.
697
    if task_rule:
12✔
698
        if len(rule_args) != len(task_rule.parameters):
12✔
699
            raise ValueError(
×
700
                "Error running rule with mocks:\n"
701
                f"Rule {task_rule.func.__qualname__} expected to receive arguments of the "
702
                f"form: {task_rule.parameters}; got: {rule_args}"
703
            )
704

705
        # Access the original function, rather than the trampoline that we would get by calling
706
        # it directly.
707
        func = task_rule.func
12✔
708
    else:
709
        func = rule
3✔
710

711
    res = func(*(rule_args or ()))
12✔
712
    if not isinstance(res, (Coroutine, Generator)):
12✔
713
        return res
×
714

715
    unconsumed_mock_calls = set(mock_calls.keys())
12✔
716

717
    def get(res: Any):
12✔
718
        if not isinstance(res, Call):
12✔
719
            raise AssertionError(f"Bad arg type: {res}")
×
720
        mock_call = mock_calls.get(res.rule_id)
12✔
721
        if mock_call is None:
12✔
722
            raise AssertionError(f"No mock_call provided for {res.rule_id}.")
×
723
        unconsumed_mock_calls.discard(res.rule_id)
12✔
724
        # NB: if the mock declares an `__implicitly` parameter, forward the raw `(dict,)` so it
725
        # can inspect declared types (e.g. to route polymorphic dispatch); otherwise unpack the
726
        # implicit values positionally.
727
        implicit = res.implicit_args
12✔
728
        if implicit and "__implicitly" in inspect.signature(mock_call).parameters:
12✔
729
            return mock_call(*res.args, __implicitly=(implicit,))
3✔
730
        return mock_call(*res.args, *implicit)
12✔
731

732
    rule_coroutine = res
12✔
733
    rule_input = None
12✔
734

735
    def warn_on_unconsumed_mocks():
12✔
736
        # Note that we used `warnings` instead of `logger.warning` because the latter may
737
        # get captured or swallowed by the test framework.
738
        if show_warnings:
12✔
739
            if unconsumed_mock_calls:
12✔
740
                warnings.warn(f"Unconsumed mock_calls: {unconsumed_mock_calls}")
9✔
741

742
    while True:
12✔
743
        try:
12✔
744
            res = rule_coroutine.send(rule_input)
12✔
745
            if isinstance(res, Call):
12✔
746
                rule_input = get(res)
12✔
747
            elif isinstance(res, _Concurrently):
6✔
748
                rule_input = [get(g) for g in res.calls]
6✔
749
            elif type(res) in (tuple, list):
×
750
                rule_input = [get(g) for g in res]
×
751
            else:
752
                warn_on_unconsumed_mocks()
×
753
                return res  # type: ignore[no-any-return]
×
754
        except StopIteration as e:
12✔
755
            warn_on_unconsumed_mocks()
12✔
756
            return e.value  # type: ignore[no-any-return]
12✔
757

758

759
@contextmanager
12✔
760
def stdin_context(content: bytes | str | None = None):
12✔
761
    if content is None:
12✔
762
        yield open("/dev/null")
12✔
763
    else:
764
        with temporary_file(binary_mode=isinstance(content, bytes)) as stdin_file:
2✔
765
            stdin_file.write(content)
2✔
766
            stdin_file.close()
2✔
767
            yield open(stdin_file.name)
2✔
768

769

770
@contextmanager
12✔
771
def mock_console(
12✔
772
    options_bootstrapper: OptionsBootstrapper,
773
    *,
774
    stdin_content: bytes | str | None = None,
775
) -> Iterator[tuple[Console, StdioReader]]:
776
    with pushd(get_buildroot()):
12✔
777
        global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope()
12✔
778
        colors = (
12✔
779
            options_bootstrapper.full_options_for_scopes(
780
                [GlobalOptions.get_scope_info()],
781
                UnionMembership.empty(),
782
                allow_unknown_options=True,
783
            )
784
            .for_global_scope()
785
            .colors
786
        )
787

788
    with (
12✔
789
        initialize_stdio(global_bootstrap_options),
790
        stdin_context(stdin_content) as stdin,
791
        temporary_file(binary_mode=False) as stdout,
792
        temporary_file(binary_mode=False) as stderr,
793
        stdio_destination(
794
            stdin_fileno=stdin.fileno(),
795
            stdout_fileno=stdout.fileno(),
796
            stderr_fileno=stderr.fileno(),
797
        ),
798
    ):
799
        # NB: We yield a Console without overriding the destination argument, because we have
800
        # already done a sys.std* level replacement. The replacement is necessary in order for
801
        # InteractiveProcess to have native file handles to interact with.
802
        yield (
12✔
803
            Console(use_colors=colors),
804
            StdioReader(_stdout=Path(stdout.name), _stderr=Path(stderr.name)),
805
        )
806

807

808
@dataclass
12✔
809
class StdioReader:
12✔
810
    _stdout: Path
12✔
811
    _stderr: Path
12✔
812

813
    def get_stdout(self) -> str:
12✔
814
        """Return all data that has been flushed to stdout so far."""
815
        return self._stdout.read_text()
11✔
816

817
    def get_stderr(self) -> str:
12✔
818
        """Return all data that has been flushed to stderr so far."""
819
        return self._stderr.read_text()
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc