• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25443604553

06 May 2026 03:05PM UTC coverage: 92.879% (-0.04%) from 92.915%
25443604553

push

github

web-flow
[pants_ng] Scaffolding for a pants_ng mode. (#23319)

In this mode the command line is parsed as an
NG invocation, and dispatched appropriately.

Of course at the moment there are no
implementations to dispatch to. That will follow.

This does expose a new option, `pants_ng` to users. 
There is a big warning not to set it, but we're not trying
to hide that we're working on a new thing, so I am
comfortable with this.

25 of 76 new or added lines in 9 files covered. (32.89%)

1294 existing lines in 76 files now uncovered.

92234 of 99306 relevant lines covered (92.88%)

4.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/src/python/pants/testutil/rule_runner.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import atexit
12✔
7
import dataclasses
12✔
8
import difflib
12✔
9
import functools
12✔
10
import inspect
12✔
11
import os
12✔
12
import re
12✔
13
import sys
12✔
14
import warnings
12✔
15
from collections.abc import Callable, Coroutine, Generator, Iterable, Iterator, Mapping, Sequence
12✔
16
from contextlib import contextmanager
12✔
17
from dataclasses import dataclass
12✔
18
from io import StringIO
12✔
19
from pathlib import Path, PurePath
12✔
20
from tempfile import mkdtemp
12✔
21
from typing import Any, TypeVar, cast, overload
12✔
22

23
from pants.base.build_environment import get_buildroot
12✔
24
from pants.base.build_root import BuildRoot
12✔
25
from pants.base.specs_parser import SpecsParser
12✔
26
from pants.build_graph import build_configuration
12✔
27
from pants.build_graph.build_configuration import BuildConfiguration
12✔
28
from pants.build_graph.build_file_aliases import BuildFileAliases
12✔
29
from pants.core.goals.run import generate_run_request
12✔
30
from pants.core.util_rules import adhoc_binaries, misc
12✔
31
from pants.engine.addresses import Address
12✔
32
from pants.engine.console import Console
12✔
33
from pants.engine.env_vars import CompleteEnvironmentVars
12✔
34
from pants.engine.environment import EnvironmentName
12✔
35
from pants.engine.fs import CreateDigest, Digest, FileContent, Snapshot, Workspace
12✔
36
from pants.engine.goal import CurrentExecutingGoals, Goal
12✔
37
from pants.engine.internals import native_engine, options_parsing
12✔
38
from pants.engine.internals.native_engine import (
12✔
39
    ProcessExecutionEnvironment,
40
    PyExecutor,
41
    _Concurrently,
42
)
43
from pants.engine.internals.scheduler import ExecutionError, Scheduler, SchedulerSession
12✔
44
from pants.engine.internals.selectors import Call, Params
12✔
45
from pants.engine.internals.session import SessionValues
12✔
46
from pants.engine.platform import Platform
12✔
47
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
12✔
48
from pants.engine.rules import QueryRule as QueryRule
12✔
49
from pants.engine.target import AllTargets, Target, WrappedTarget, WrappedTargetRequest
12✔
50
from pants.engine.unions import UnionMembership, UnionRule
12✔
51
from pants.goal.auxiliary_goal import AuxiliaryGoal
12✔
52
from pants.init.engine_initializer import EngineInitializer
12✔
53
from pants.init.logging import initialize_stdio, initialize_stdio_raw, stdio_destination
12✔
54
from pants.option.bootstrap_options import DynamicRemoteOptions, ExecutionOptions, LocalStoreOptions
12✔
55
from pants.option.global_options import GlobalOptions
12✔
56
from pants.option.options_bootstrapper import OptionsBootstrapper
12✔
57
from pants.source import source_root
12✔
58
from pants.testutil.option_util import create_options_bootstrapper
12✔
59
from pants.util.collections import assert_single_element
12✔
60
from pants.util.contextutil import pushd, temporary_dir, temporary_file
12✔
61
from pants.util.dirutil import recursive_dirname, safe_mkdir, safe_mkdtemp, safe_open
12✔
62
from pants.util.logging import LogLevel
12✔
63
from pants.util.ordered_set import OrderedSet
12✔
64
from pants.util.strutil import softwrap
12✔
65

66

67
def logging(original_function=None, *, level: LogLevel = LogLevel.INFO):
12✔
68
    """A decorator that enables logging (optionally at the given level).
69

70
    May be used without a parameter list:
71

72
        ```
73
        @logging
74
        def test_function():
75
            ...
76
        ```
77

78
    ...or with a level argument:
79

80
        ```
81
        @logging(level=LogLevel.DEBUG)
82
        def test_function():
83
            ...
84
        ```
85
    """
86

87
    def _decorate(func):
9✔
88
        @functools.wraps(func)
9✔
89
        def wrapper(*args, **kwargs):
9✔
90
            stdout_fileno, stderr_fileno = sys.stdout.fileno(), sys.stderr.fileno()
9✔
91
            with (
9✔
92
                temporary_dir() as tempdir,
93
                initialize_stdio_raw(level, False, False, {}, True, [], tempdir),
94
                stdin_context() as stdin,
95
                stdio_destination(stdin.fileno(), stdout_fileno, stderr_fileno),
96
            ):
97
                return func(*args, **kwargs)
9✔
98

99
        return wrapper
9✔
100

101
    if original_function:
9✔
102
        return _decorate(original_function)
8✔
103
    return _decorate
1✔
104

105

106
@contextmanager
12✔
107
def engine_error(
12✔
108
    expected_underlying_exception: type[Exception] = Exception,
109
    *,
110
    contains: str | None = None,
111
    normalize_tracebacks: bool = False,
112
) -> Iterator[None]:
113
    """A context manager to catch `ExecutionError`s in tests and check that the underlying exception
114
    is expected.
115

116
    Use like this:
117

118
        with engine_error(ValueError, contains="foo"):
119
            rule_runner.request(OutputType, [input])
120

121
    Will raise AssertionError if no ExecutionError occurred.
122

123
    Set `normalize_tracebacks=True` to replace file locations and addresses in the error message
124
    with fixed values for testability, and check `contains` against the `ExecutionError` message
125
    instead of the underlying error only.
126
    """
127
    try:
10✔
128
        yield
10✔
129
    except ExecutionError as exec_error:
10✔
130
        if not len(exec_error.wrapped_exceptions) == 1:
10✔
131
            formatted_errors = "\n\n".join(repr(e) for e in exec_error.wrapped_exceptions)
×
132
            raise ValueError(
×
133
                softwrap(
134
                    f"""
135
                    Multiple underlying exceptions, but this helper function expected only one.
136
                    Use `with pytest.raises(ExecutionError) as exc` directly and inspect
137
                    `exc.value.wrapped_exceptions`.
138

139
                    Errors: {formatted_errors}
140
                    """
141
                )
142
            )
143
        underlying = exec_error.wrapped_exceptions[0]
10✔
144
        if not isinstance(underlying, expected_underlying_exception):
10✔
145
            raise AssertionError(
×
146
                softwrap(
147
                    f"""
148
                    ExecutionError occurred as expected, but the underlying exception had type
149
                    {type(underlying)} rather than the expected type
150
                    {expected_underlying_exception}:
151

152
                    {underlying}
153
                    """
154
                )
155
            )
156
        if contains is not None:
10✔
157
            if normalize_tracebacks:
9✔
158
                errmsg = remove_locations_from_traceback(str(exec_error))
1✔
159
            else:
160
                errmsg = str(underlying)
9✔
161
            if contains not in errmsg:
9✔
162
                diff = "\n".join(
×
163
                    difflib.Differ().compare(contains.splitlines(), errmsg.splitlines())
164
                )
165
                raise AssertionError(
×
166
                    softwrap(
167
                        f"""
168
                        Expected value not found in exception.
169

170
                        => Expected: {contains}
171

172
                        => Actual: {errmsg}
173

174
                        => Diff: {diff}
175
                        """
176
                    )
177
                )
178
    else:
UNCOV
179
        raise AssertionError(
1✔
180
            softwrap(
181
                f"""
182
                DID NOT RAISE ExecutionError with underlying exception type
183
                {expected_underlying_exception}.
184
                """
185
            )
186
        )
187

188

189
def remove_locations_from_traceback(trace: str) -> str:
12✔
190
    location_pattern = re.compile(r'"/.*", line \d+')
1✔
191
    address_pattern = re.compile(r"0x[0-9a-f]+")
1✔
192
    new_trace = location_pattern.sub("LOCATION-INFO", trace)
1✔
193
    new_trace = address_pattern.sub("0xEEEEEEEEE", new_trace)
1✔
194
    return new_trace
1✔
195

196

197
# -----------------------------------------------------------------------------------------------
198
# `RuleRunner`
199
# -----------------------------------------------------------------------------------------------
200

201

202
_I = TypeVar("_I")
12✔
203
_O = TypeVar("_O")
12✔
204

205

206
# A global executor for Schedulers created in unit tests, which is shutdown using `atexit`. This
207
# allows for reusing threads, and avoids waiting for straggling tasks during teardown of each test.
208
EXECUTOR = PyExecutor(
12✔
209
    # Use the ~minimum possible parallelism since integration tests using RuleRunner will already
210
    # be run by Pants using an appropriate Parallelism. We must set max_threads > core_threads; so
211
    # 2 is the minimum, but, via trial and error, 3 minimizes test times on average.
212
    core_threads=1,
213
    max_threads=3,
214
)
215
atexit.register(lambda: EXECUTOR.shutdown(5))
12✔
216

217

218
# Environment variable names required for locating Python interpreters, for use with RuleRunner's
219
# env_inherit arguments.
220
# TODO: This is verbose and redundant: see https://github.com/pantsbuild/pants/issues/13350.
221
PYTHON_BOOTSTRAP_ENV = {"PATH", "PYENV_ROOT", "HOME"}
12✔
222

223

224
@dataclass(frozen=True)
12✔
225
class GoalRuleResult:
12✔
226
    exit_code: int
12✔
227
    stdout: str
12✔
228
    stderr: str
12✔
229

230
    @staticmethod
12✔
231
    def noop() -> GoalRuleResult:
12✔
UNCOV
232
        return GoalRuleResult(0, stdout="", stderr="")
1✔
233

234

235
# This is not frozen because we need to update the `scheduler` when setting options.
236
@dataclass
12✔
237
class RuleRunner:
12✔
238
    build_root: str
12✔
239
    options_bootstrapper: OptionsBootstrapper
12✔
240
    extra_session_values: dict[Any, Any]
12✔
241
    max_workunit_verbosity: LogLevel
12✔
242
    build_config: BuildConfiguration
12✔
243
    scheduler: SchedulerSession
12✔
244
    rules: tuple[Any, ...]
12✔
245

246
    def __init__(
12✔
247
        self,
248
        *,
249
        rules: Iterable | None = None,
250
        target_types: Iterable[type[Target]] | None = None,
251
        objects: dict[str, Any] | None = None,
252
        aliases: Iterable[BuildFileAliases] | None = None,
253
        context_aware_object_factories: dict[str, Any] | None = None,
254
        isolated_local_store: bool = False,
255
        preserve_tmpdirs: bool = False,
256
        ca_certs_path: str | None = None,
257
        bootstrap_args: Iterable[str] = (),
258
        extra_session_values: dict[Any, Any] | None = None,
259
        max_workunit_verbosity: LogLevel = LogLevel.DEBUG,
260
        inherent_environment: EnvironmentName | None = EnvironmentName(None),
261
        is_bootstrap: bool = False,
262
        auxiliary_goals: Iterable[type[AuxiliaryGoal]] | None = None,
263
    ) -> None:
264
        bootstrap_args = [*bootstrap_args]
12✔
265

266
        root_dir: Path | None = None
12✔
267
        if preserve_tmpdirs:
12✔
268
            root_dir = Path(mkdtemp(prefix="RuleRunner."))
8✔
269
            print(f"Preserving rule runner temporary directories at {root_dir}.", file=sys.stderr)
8✔
270
            bootstrap_args.extend(
8✔
271
                ["--keep-sandboxes=always", f"--local-execution-root-dir={root_dir}"]
272
            )
273
            build_root = (root_dir / "BUILD_ROOT").resolve()
8✔
274
            build_root.mkdir()
8✔
275
            self.build_root = str(build_root)
8✔
276
        else:
277
            self.build_root = os.path.realpath(safe_mkdtemp(prefix="_BUILD_ROOT"))
12✔
278

279
        safe_mkdir(self.pants_workdir)
12✔
280
        BuildRoot().path = self.build_root
12✔
281

282
        def rewrite_rule_for_inherent_environment(rule):
12✔
283
            if not inherent_environment or not isinstance(rule, QueryRule):
12✔
284
                return rule
12✔
285
            return QueryRule(rule.output_type, OrderedSet((*rule.input_types, EnvironmentName)))
12✔
286

287
        # TODO: Redesign rule registration for tests to be more ergonomic and to make this less
288
        #  special-cased.
289
        self.rules = tuple(rewrite_rule_for_inherent_environment(rule) for rule in (rules or ()))
12✔
290
        all_rules = (
12✔
291
            *self.rules,
292
            *build_configuration.rules(),
293
            *source_root.rules(),
294
            *options_parsing.rules(),
295
            *misc.rules(),
296
            *adhoc_binaries.rules(),
297
            # Many tests indirectly rely on this rule.
298
            generate_run_request,
299
            QueryRule(WrappedTarget, [WrappedTargetRequest]),
300
            QueryRule(AllTargets, []),
301
            QueryRule(UnionMembership, []),
302
        )
303
        build_config_builder = BuildConfiguration.Builder()
12✔
304
        build_config_builder.register_aliases(
12✔
305
            BuildFileAliases(
306
                objects=objects, context_aware_object_factories=context_aware_object_factories
307
            )
308
        )
309
        aliases = aliases or ()
12✔
310
        for build_file_aliases in aliases:
12✔
311
            build_config_builder.register_aliases(build_file_aliases)
1✔
312

313
        build_config_builder.register_rules("_dummy_for_test_", all_rules)
12✔
314
        build_config_builder.register_target_types("_dummy_for_test_", target_types or ())
12✔
315
        build_config_builder.register_auxiliary_goals("_dummy_for_test_", auxiliary_goals or ())
12✔
316
        self.build_config = build_config_builder.create()
12✔
317

318
        self.environment = CompleteEnvironmentVars({})
12✔
319
        self.extra_session_values = extra_session_values or {}
12✔
320
        self.inherent_environment = inherent_environment
12✔
321
        self.max_workunit_verbosity = max_workunit_verbosity
12✔
322

323
        # Change cwd and add sentinel file (BUILDROOT) so NativeOptionParser can find build_root.
324
        with self.pushd():
12✔
325
            Path("BUILDROOT").touch()
12✔
326
            self.options_bootstrapper = self.create_options_bootstrapper(
12✔
327
                args=bootstrap_args, env=None
328
            )
329
            options = self.options_bootstrapper.full_options(
12✔
330
                known_scope_infos=self.build_config.known_scope_infos,
331
                union_membership=UnionMembership.from_rules(
332
                    rule for rule in self.rules if isinstance(rule, UnionRule)
333
                ),
334
                allow_unknown_options=self.build_config.allow_unknown_options,
335
            )
336
            global_options = self.options_bootstrapper.bootstrap_options.for_global_scope()
12✔
337

338
        dynamic_remote_options, _ = DynamicRemoteOptions.from_options(options, self.environment)
12✔
339
        local_store_options = LocalStoreOptions.from_options(global_options)
12✔
340
        if isolated_local_store:
12✔
341
            if root_dir:
4✔
342
                lmdb_store_dir = root_dir / "lmdb_store"
×
343
                lmdb_store_dir.mkdir()
×
344
                store_dir = str(lmdb_store_dir)
×
345
            else:
346
                store_dir = safe_mkdtemp(prefix="lmdb_store.")
4✔
347
            local_store_options = dataclasses.replace(local_store_options, store_dir=store_dir)
4✔
348

349
        local_execution_root_dir = global_options.local_execution_root_dir
12✔
350
        named_caches_dir = global_options.named_caches_dir
12✔
351

352
        self._set_new_session(
12✔
353
            EngineInitializer.setup_graph_extended(
354
                pants_ignore_patterns=GlobalOptions.compute_pants_ignore(
355
                    self.build_root, global_options
356
                ),
357
                use_gitignore=False,
358
                local_store_options=local_store_options,
359
                local_execution_root_dir=local_execution_root_dir,
360
                named_caches_dir=named_caches_dir,
361
                pants_workdir=self.pants_workdir,
362
                build_root=self.build_root,
363
                build_configuration=self.build_config,
364
                # Each Scheduler that is created borrows the global executor, which is shut down `atexit`.
365
                executor=EXECUTOR.to_borrowed(),
366
                execution_options=ExecutionOptions.from_options(
367
                    global_options, dynamic_remote_options
368
                ),
369
                ca_certs_path=ca_certs_path,
370
                engine_visualize_to=None,
371
                is_bootstrap=is_bootstrap,
372
            ).scheduler
373
        )
374

375
    def __repr__(self) -> str:
12✔
UNCOV
376
        return f"RuleRunner(build_root={self.build_root})"
1✔
377

378
    def _set_new_session(self, scheduler: Scheduler) -> None:
12✔
379
        self.scheduler = scheduler.new_session(
12✔
380
            build_id="buildid_for_test",
381
            session_values=SessionValues(
382
                {
383
                    OptionsBootstrapper: self.options_bootstrapper,
384
                    CompleteEnvironmentVars: self.environment,
385
                    CurrentExecutingGoals: CurrentExecutingGoals(),
386
                    **self.extra_session_values,
387
                }
388
            ),
389
            max_workunit_level=self.max_workunit_verbosity,
390
        )
391

392
    @property
12✔
393
    def pants_workdir(self) -> str:
12✔
394
        return os.path.join(self.build_root, ".pants.d", "workdir")
12✔
395

396
    @property
12✔
397
    def target_types(self) -> tuple[type[Target], ...]:
12✔
398
        return self.build_config.target_types
1✔
399

400
    @property
12✔
401
    def union_membership(self) -> UnionMembership:
12✔
402
        """An instance of `UnionMembership` with all the test's registered `UnionRule`s."""
403
        return self.request(UnionMembership, [])
12✔
404

405
    def new_session(self, build_id: str) -> None:
12✔
406
        """Mutates this RuleRunner to begin a new Session with the same Scheduler."""
407
        self.scheduler = self.scheduler.scheduler.new_session(build_id)
5✔
408

409
    def request(self, output_type: type[_O], inputs: Iterable[Any]) -> _O:
12✔
410
        params = (
12✔
411
            Params(*inputs, self.inherent_environment)
412
            if self.inherent_environment
413
            else Params(*inputs)
414
        )
415
        with self.pushd():
12✔
416
            result = assert_single_element(self.scheduler.product_request(output_type, params))
12✔
417
        return cast(_O, result)
12✔
418

419
    def run_goal_rule(
12✔
420
        self,
421
        goal: type[Goal],
422
        *,
423
        global_args: Iterable[str] | None = None,
424
        args: Iterable[str] | None = None,
425
        env: Mapping[str, str] | None = None,
426
        env_inherit: set[str] | None = None,
427
    ) -> GoalRuleResult:
428
        merged_args = (*(global_args or []), goal.name, *(args or []))
12✔
429
        self.set_options(merged_args, env=env, env_inherit=env_inherit)
12✔
430

431
        with self.pushd():
12✔
432
            raw_specs = self.options_bootstrapper.full_options_for_scopes(
12✔
433
                [GlobalOptions.get_scope_info(), goal.subsystem_cls.get_scope_info()],
434
                self.union_membership,
435
            ).specs
436
        specs = SpecsParser(root_dir=self.build_root).parse_specs(
12✔
437
            raw_specs, description_of_origin="RuleRunner.run_goal_rule()"
438
        )
439

440
        stdout, stderr = StringIO(), StringIO()
12✔
441
        console = Console(stdout=stdout, stderr=stderr, use_colors=False, session=self.scheduler)
12✔
442

443
        with self.pushd():
12✔
444
            exit_code = self.scheduler.run_goal_rule(
12✔
445
                goal,
446
                Params(
447
                    specs,
448
                    console,
449
                    Workspace(self.scheduler),
450
                    *([self.inherent_environment] if self.inherent_environment else []),
451
                ),
452
            )
453

454
        console.flush()
12✔
455
        return GoalRuleResult(exit_code, stdout.getvalue(), stderr.getvalue())
12✔
456

457
    @contextmanager
12✔
458
    def pushd(self):
12✔
459
        with pushd(self.build_root):
12✔
460
            yield
12✔
461

462
    def create_options_bootstrapper(
12✔
463
        self, args: Iterable[str], env: Mapping[str, str] | None
464
    ) -> OptionsBootstrapper:
465
        return create_options_bootstrapper(args=args, env=env)
12✔
466

467
    def set_options(
12✔
468
        self,
469
        args: Iterable[str],
470
        *,
471
        env: Mapping[str, str] | None = None,
472
        env_inherit: set[str] | None = None,
473
    ) -> None:
474
        """Update the engine session with new options and/or environment variables.
475

476
        The environment variables will be used to set the `CompleteEnvironmentVars`, which is the
477
        environment variables captured by the parent Pants process. Some rules use this to be able
478
        to read arbitrary env vars. Any options that start with `PANTS_` will also be used to set
479
        options.
480

481
        Environment variables listed in `env_inherit` and not in `env` will be inherited from the test
482
        runner's environment (os.environ)
483

484
        This will override any previously configured values.
485
        """
486
        env = {
12✔
487
            **{k: os.environ[k] for k in (env_inherit or set()) if k in os.environ},
488
            **(env or {}),
489
        }
490
        with self.pushd():
12✔
491
            self.options_bootstrapper = self.create_options_bootstrapper(args=args, env=env)
12✔
492
        self.environment = CompleteEnvironmentVars(env)
12✔
493
        self._set_new_session(self.scheduler.scheduler)
12✔
494

495
    def set_session_values(
12✔
496
        self,
497
        extra_session_values: dict[Any, Any],
498
    ) -> None:
499
        """Update the engine Session with new session_values."""
500
        self.extra_session_values = extra_session_values
4✔
501
        self._set_new_session(self.scheduler.scheduler)
4✔
502

503
    def _invalidate_for(self, *relpaths: str):
12✔
504
        """Invalidates all files from the relpath, recursively up to the root.
505

506
        Many python operations implicitly create parent directories, so we assume that touching a
507
        file located below directories that do not currently exist will result in their creation.
508
        """
509
        files = {f for relpath in relpaths for f in recursive_dirname(relpath)}
12✔
510
        return self.scheduler.invalidate_files(files)
12✔
511

512
    def chmod(self, relpath: str | PurePath, mode: int) -> None:
12✔
513
        """Change the file mode and permissions.
514

515
        relpath: The relative path to the file or directory from the build root.
516
        mode: The file mode to set, preferable in octal representation, e.g. `mode=0o750`.
517
        """
518
        Path(self.build_root, relpath).chmod(mode)
2✔
519
        self._invalidate_for(str(relpath))
2✔
520

521
    def create_dir(self, relpath: str) -> str:
12✔
522
        """Creates a directory under the buildroot.
523

524
        :API: public
525

526
        relpath: The relative path to the directory from the build root.
527
        """
528
        path = os.path.join(self.build_root, relpath)
3✔
529
        safe_mkdir(path)
3✔
530
        self._invalidate_for(relpath)
3✔
531
        return path
3✔
532

533
    def _create_file(
12✔
534
        self, relpath: str | PurePath, contents: bytes | str = "", mode: str = "w"
535
    ) -> str:
536
        """Writes to a file under the buildroot.
537

538
        relpath: The relative path to the file from the build root.
539
        contents: A string containing the contents of the file - '' by default..
540
        mode: The mode to write to the file in - over-write by default.
541
        """
542
        path = os.path.join(self.build_root, relpath)
12✔
543
        with safe_open(path, mode=mode) as fp:
12✔
544
            fp.write(contents)
12✔
545
        self._invalidate_for(str(relpath))
12✔
546
        return path
12✔
547

548
    @overload
549
    def write_files(self, files: Mapping[str, str | bytes]) -> tuple[str, ...]: ...
550

551
    @overload
552
    def write_files(self, files: Mapping[PurePath, str | bytes]) -> tuple[str, ...]: ...
553

554
    def write_files(
12✔
555
        self, files: Mapping[PurePath, str | bytes] | Mapping[str, str | bytes]
556
    ) -> tuple[str, ...]:
557
        """Write the files to the build root.
558

559
        :API: public
560

561
        files: A mapping of file names to contents.
562
        returns: A tuple of absolute file paths created.
563
        """
564
        paths = []
12✔
565
        for path, content in files.items():
12✔
566
            paths.append(
12✔
567
                self._create_file(path, content, mode="wb" if isinstance(content, bytes) else "w")
568
            )
569
        return tuple(paths)
12✔
570

571
    def read_file(self, file: str | PurePath, mode: str = "r") -> str | bytes:
12✔
572
        """Read a file that was written to the build root, useful for testing."""
573
        path = os.path.join(self.build_root, file)
3✔
574
        with safe_open(path, mode=mode) as fp:
3✔
575
            if "b" in mode:
3✔
576
                return bytes(fp.read())
×
577
            return str(fp.read())
3✔
578

579
    def make_snapshot(self, files: Mapping[str, str | bytes]) -> Snapshot:
12✔
580
        """Makes a snapshot from a map of file name to file content.
581

582
        :API: public
583
        """
584
        file_contents = [
12✔
585
            FileContent(path, content.encode() if isinstance(content, str) else content)
586
            for path, content in files.items()
587
        ]
588
        digest = self.request(Digest, [CreateDigest(file_contents)])
12✔
589
        return self.request(Snapshot, [digest])
12✔
590

591
    def make_snapshot_of_empty_files(self, files: Iterable[str]) -> Snapshot:
12✔
592
        """Makes a snapshot with empty content for each file.
593

594
        This is a convenience around `TestBase.make_snapshot`, which allows specifying the content
595
        for each file.
596

597
        :API: public
598
        """
599
        return self.make_snapshot(dict.fromkeys(files, ""))
3✔
600

601
    def get_target(self, address: Address) -> Target:
12✔
602
        """Find the target for a given address.
603

604
        This requires that the target actually exists, i.e. that you set up its BUILD file.
605

606
        :API: public
607
        """
608
        return self.request(
12✔
609
            WrappedTarget,
610
            [WrappedTargetRequest(address, description_of_origin="RuleRunner.get_target()")],
611
        ).target
612

613
    def write_digest(
12✔
614
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
615
    ) -> None:
616
        """Write a digest to disk, relative to the test's build root.
617

618
        Access the written files by using `os.path.join(rule_runner.build_root, <relpath>)`.
619
        """
620
        native_engine.write_digest(
8✔
621
            self.scheduler.py_scheduler,
622
            self.scheduler.py_session,
623
            digest,
624
            path_prefix or "",
625
            clear_paths,
626
        )
627

628
    def run_interactive_process(self, request: InteractiveProcess) -> InteractiveProcessResult:
12✔
629
        with self.pushd():
9✔
630
            return native_engine.session_run_interactive_process(
9✔
631
                self.scheduler.py_session,
632
                request,
633
                ProcessExecutionEnvironment(
634
                    environment_name=None,
635
                    platform=Platform.create_for_localhost().value,
636
                    docker_image=None,
637
                    remote_execution=False,
638
                    remote_execution_extra_platform_properties=[],
639
                    execute_in_workspace=False,
640
                    keep_sandboxes="never",
641
                ),
642
            )
643

644

645
# -----------------------------------------------------------------------------------------------
646
# `run_rule_with_mocks()`
647
# -----------------------------------------------------------------------------------------------
648

649

650
def run_rule_with_mocks(
12✔
651
    rule: Callable[..., Coroutine[Any, Any, _O]],
652
    *,
653
    rule_args: Sequence[Any] = (),
654
    mock_calls: Mapping[str, Callable] | None = None,
655
    union_membership: UnionMembership | None = None,
656
    show_warnings: bool = True,
657
) -> _O:
658
    """A test helper that runs an @rule with a set of args and mocked underlying @rule invocations.
659

660
    An @rule named `my_rule` that takes one argument and invokes no other @rules (by-name  or via
661
    `Get` requests) can be invoked like so:
662

663
    ```
664
    return_value = run_rule_with_mocks(my_rule, rule_args=[arg1])
665
    ```
666

667
    In the case of an @rule that invokes other @rules, either by name or via `Get` requests,
668
    the `mock_calls` argument must be provided.
669

670
    `mock_calls` is a mapping of fully-qualified rule name to the function that mocks that rule,
671
    and mocks out calls by name to the corresponding rules.
672

673
    So in the case of an @rule named `my_co_rule` that takes one argument and calls the @rule
674
    `path.to.module.list_dir` by name to produce a `Listing` from a `Dir`, the invoke might look
675
    like:
676

677
    ```
678
    return_value = run_rule_with_mocks(
679
      my_co_rule,
680
      rule_args=[arg1],
681
      mock_calls={
682
        "path.to.module.list_dir": lambda dir_subject: Listing(..),
683
      },
684
    )
685
    ```
686

687
    :returns: The return value of the completed @rule.
688
    """
689
    mock_calls = mock_calls or {}
12✔
690

691
    task_rule = getattr(rule, "rule", None)
12✔
692

693
    func: Callable[..., Coroutine[Any, Any, _O]] | Callable[..., _O]
694

695
    # Perform additional validation on `@rule` that the correct args are provided. We don't have
696
    # an easy way to do this for async helper calls yet.
697
    if task_rule:
12✔
698
        if len(rule_args) != len(task_rule.parameters):
12✔
699
            raise ValueError(
×
700
                "Error running rule with mocks:\n"
701
                f"Rule {task_rule.func.__qualname__} expected to receive arguments of the "
702
                f"form: {task_rule.parameters}; got: {rule_args}"
703
            )
704

705
        # Access the original function, rather than the trampoline that we would get by calling
706
        # it directly.
707
        func = task_rule.func
12✔
708
    else:
709
        func = rule
3✔
710

711
    res = func(*(rule_args or ()))
12✔
712
    if not isinstance(res, (Coroutine, Generator)):
12✔
713
        return res
×
714

715
    unconsumed_mock_calls = set(mock_calls.keys())
12✔
716

717
    def get(res: Any):
12✔
718
        if not isinstance(res, Call):
12✔
719
            raise AssertionError(f"Bad arg type: {res}")
×
720
        mock_call = mock_calls.get(res.rule_id)
12✔
721
        if mock_call is None:
12✔
722
            raise AssertionError(f"No mock_call provided for {res.rule_id}.")
×
723
        unconsumed_mock_calls.discard(res.rule_id)
12✔
724
        # NB: if the mock declares an `__implicitly` parameter, forward the raw `(dict,)` so it
725
        # can inspect declared types (e.g. to route polymorphic dispatch); otherwise unpack the
726
        # implicit values positionally.
727
        implicit = res.implicit_args
12✔
728
        if implicit and "__implicitly" in inspect.signature(mock_call).parameters:
12✔
729
            return mock_call(*res.args, __implicitly=(implicit,))
3✔
730
        return mock_call(*res.args, *implicit)
12✔
731

732
    rule_coroutine = res
12✔
733
    rule_input = None
12✔
734

735
    def warn_on_unconsumed_mocks():
12✔
736
        # Note that we used `warnings` instead of `logger.warning` because the latter may
737
        # get captured or swallowed by the test framework.
738
        if show_warnings:
12✔
739
            if unconsumed_mock_calls:
12✔
740
                warnings.warn(f"Unconsumed mock_calls: {unconsumed_mock_calls}")
9✔
741

742
    while True:
12✔
743
        try:
12✔
744
            res = rule_coroutine.send(rule_input)
12✔
745
            if isinstance(res, Call):
12✔
746
                rule_input = get(res)
12✔
747
            elif isinstance(res, _Concurrently):
6✔
748
                rule_input = [get(g) for g in res.calls]
6✔
749
            elif type(res) in (tuple, list):
×
750
                rule_input = [get(g) for g in res]
×
751
            else:
752
                warn_on_unconsumed_mocks()
×
753
                return res  # type: ignore[no-any-return]
×
754
        except StopIteration as e:
12✔
755
            warn_on_unconsumed_mocks()
12✔
756
            return e.value  # type: ignore[no-any-return]
12✔
757

758

759
@contextmanager
12✔
760
def stdin_context(content: bytes | str | None = None):
12✔
761
    if content is None:
12✔
762
        yield open("/dev/null")
12✔
763
    else:
764
        with temporary_file(binary_mode=isinstance(content, bytes)) as stdin_file:
2✔
765
            stdin_file.write(content)
2✔
766
            stdin_file.close()
2✔
767
            yield open(stdin_file.name)
2✔
768

769

770
@contextmanager
12✔
771
def mock_console(
12✔
772
    options_bootstrapper: OptionsBootstrapper,
773
    *,
774
    stdin_content: bytes | str | None = None,
775
) -> Iterator[tuple[Console, StdioReader]]:
776
    with pushd(get_buildroot()):
12✔
777
        global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope()
12✔
778
        colors = (
12✔
779
            options_bootstrapper.full_options_for_scopes(
780
                [GlobalOptions.get_scope_info()],
781
                UnionMembership.empty(),
782
                allow_unknown_options=True,
783
            )
784
            .for_global_scope()
785
            .colors
786
        )
787

788
    with (
12✔
789
        initialize_stdio(global_bootstrap_options),
790
        stdin_context(stdin_content) as stdin,
791
        temporary_file(binary_mode=False) as stdout,
792
        temporary_file(binary_mode=False) as stderr,
793
        stdio_destination(
794
            stdin_fileno=stdin.fileno(),
795
            stdout_fileno=stdout.fileno(),
796
            stderr_fileno=stderr.fileno(),
797
        ),
798
    ):
799
        # NB: We yield a Console without overriding the destination argument, because we have
800
        # already done a sys.std* level replacement. The replacement is necessary in order for
801
        # InteractiveProcess to have native file handles to interact with.
802
        yield (
12✔
803
            Console(use_colors=colors),
804
            StdioReader(_stdout=Path(stdout.name), _stderr=Path(stderr.name)),
805
        )
806

807

808
@dataclass
12✔
809
class StdioReader:
12✔
810
    _stdout: Path
12✔
811
    _stderr: Path
12✔
812

813
    def get_stdout(self) -> str:
12✔
814
        """Return all data that has been flushed to stdout so far."""
815
        return self._stdout.read_text()
11✔
816

817
    def get_stderr(self) -> str:
12✔
818
        """Return all data that has been flushed to stderr so far."""
819
        return self._stderr.read_text()
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc