• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26080722777

19 May 2026 06:37AM UTC coverage: 52.106% (-11.5%) from 63.597%
26080722777

Pull #23250

github

web-flow
Merge 63ec06323 into 2693df832
Pull Request #23250: Feature: Add generic option to docker image

12 of 50 new or added lines in 3 files covered. (24.0%)

5382 existing lines in 201 files now uncovered.

32053 of 61515 relevant lines covered (52.11%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.53
/src/python/pants/testutil/rule_runner.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import atexit
2✔
7
import dataclasses
2✔
8
import difflib
2✔
9
import functools
2✔
10
import inspect
2✔
11
import os
2✔
12
import re
2✔
13
import sys
2✔
14
import warnings
2✔
15
from collections.abc import Callable, Coroutine, Generator, Iterable, Iterator, Mapping, Sequence
2✔
16
from contextlib import contextmanager
2✔
17
from dataclasses import dataclass
2✔
18
from io import StringIO
2✔
19
from pathlib import Path, PurePath
2✔
20
from tempfile import mkdtemp
2✔
21
from typing import Any, TypeVar, cast, overload
2✔
22

23
from pants.base.build_environment import get_buildroot
2✔
24
from pants.base.build_root import BuildRoot
2✔
25
from pants.base.specs_parser import SpecsParser
2✔
26
from pants.build_graph import build_configuration
2✔
27
from pants.build_graph.build_configuration import BuildConfiguration
2✔
28
from pants.build_graph.build_file_aliases import BuildFileAliases
2✔
29
from pants.core.goals.run import generate_run_request
2✔
30
from pants.core.util_rules import adhoc_binaries, misc
2✔
31
from pants.engine.addresses import Address
2✔
32
from pants.engine.console import Console
2✔
33
from pants.engine.env_vars import CompleteEnvironmentVars
2✔
34
from pants.engine.environment import EnvironmentName
2✔
35
from pants.engine.fs import CreateDigest, Digest, FileContent, Snapshot, Workspace
2✔
36
from pants.engine.goal import CurrentExecutingGoals, Goal
2✔
37
from pants.engine.internals import native_engine, options_parsing
2✔
38
from pants.engine.internals.native_engine import (
2✔
39
    ProcessExecutionEnvironment,
40
    PyExecutor,
41
    _Concurrently,
42
)
43
from pants.engine.internals.scheduler import ExecutionError, Scheduler, SchedulerSession
2✔
44
from pants.engine.internals.selectors import Call, Params
2✔
45
from pants.engine.internals.session import SessionValues
2✔
46
from pants.engine.platform import Platform
2✔
47
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
2✔
48
from pants.engine.rules import QueryRule as QueryRule
2✔
49
from pants.engine.target import AllTargets, Target, WrappedTarget, WrappedTargetRequest
2✔
50
from pants.engine.unions import UnionMembership, UnionRule
2✔
51
from pants.goal.auxiliary_goal import AuxiliaryGoal
2✔
52
from pants.init.engine_initializer import EngineInitializer
2✔
53
from pants.init.logging import initialize_stdio, initialize_stdio_raw, stdio_destination
2✔
54
from pants.option.bootstrap_options import DynamicRemoteOptions, ExecutionOptions, LocalStoreOptions
2✔
55
from pants.option.global_options import GlobalOptions
2✔
56
from pants.option.options_bootstrapper import OptionsBootstrapper
2✔
57
from pants.source import source_root
2✔
58
from pants.testutil.option_util import create_options_bootstrapper
2✔
59
from pants.util.collections import assert_single_element
2✔
60
from pants.util.contextutil import pushd, temporary_dir, temporary_file
2✔
61
from pants.util.dirutil import recursive_dirname, safe_mkdir, safe_mkdtemp, safe_open
2✔
62
from pants.util.logging import LogLevel
2✔
63
from pants.util.ordered_set import OrderedSet
2✔
64
from pants.util.strutil import softwrap
2✔
65

66

67
def logging(original_function=None, *, level: LogLevel = LogLevel.INFO):
2✔
68
    """A decorator that enables logging (optionally at the given level).
69

70
    May be used without a parameter list:
71

72
        ```
73
        @logging
74
        def test_function():
75
            ...
76
        ```
77

78
    ...or with a level argument:
79

80
        ```
81
        @logging(level=LogLevel.DEBUG)
82
        def test_function():
83
            ...
84
        ```
85
    """
86

87
    def _decorate(func):
2✔
88
        @functools.wraps(func)
2✔
89
        def wrapper(*args, **kwargs):
2✔
90
            stdout_fileno, stderr_fileno = sys.stdout.fileno(), sys.stderr.fileno()
2✔
91
            with (
2✔
92
                temporary_dir() as tempdir,
93
                initialize_stdio_raw(level, False, False, {}, True, [], tempdir),
94
                stdin_context() as stdin,
95
                stdio_destination(stdin.fileno(), stdout_fileno, stderr_fileno),
96
            ):
97
                return func(*args, **kwargs)
2✔
98

99
        return wrapper
2✔
100

101
    if original_function:
2✔
102
        return _decorate(original_function)
2✔
103
    return _decorate
×
104

105

106
@contextmanager
2✔
107
def engine_error(
2✔
108
    expected_underlying_exception: type[Exception] = Exception,
109
    *,
110
    contains: str | None = None,
111
    normalize_tracebacks: bool = False,
112
) -> Iterator[None]:
113
    """A context manager to catch `ExecutionError`s in tests and check that the underlying exception
114
    is expected.
115

116
    Use like this:
117

118
        with engine_error(ValueError, contains="foo"):
119
            rule_runner.request(OutputType, [input])
120

121
    Will raise AssertionError if no ExecutionError occurred.
122

123
    Set `normalize_tracebacks=True` to replace file locations and addresses in the error message
124
    with fixed values for testability, and check `contains` against the `ExecutionError` message
125
    instead of the underlying error only.
126
    """
UNCOV
127
    try:
×
UNCOV
128
        yield
×
UNCOV
129
    except ExecutionError as exec_error:
×
UNCOV
130
        if not len(exec_error.wrapped_exceptions) == 1:
×
131
            formatted_errors = "\n\n".join(repr(e) for e in exec_error.wrapped_exceptions)
×
132
            raise ValueError(
×
133
                softwrap(
134
                    f"""
135
                    Multiple underlying exceptions, but this helper function expected only one.
136
                    Use `with pytest.raises(ExecutionError) as exc` directly and inspect
137
                    `exc.value.wrapped_exceptions`.
138

139
                    Errors: {formatted_errors}
140
                    """
141
                )
142
            )
UNCOV
143
        underlying = exec_error.wrapped_exceptions[0]
×
UNCOV
144
        if not isinstance(underlying, expected_underlying_exception):
×
145
            raise AssertionError(
×
146
                softwrap(
147
                    f"""
148
                    ExecutionError occurred as expected, but the underlying exception had type
149
                    {type(underlying)} rather than the expected type
150
                    {expected_underlying_exception}:
151

152
                    {underlying}
153
                    """
154
                )
155
            )
UNCOV
156
        if contains is not None:
×
UNCOV
157
            if normalize_tracebacks:
×
158
                errmsg = remove_locations_from_traceback(str(exec_error))
×
159
            else:
UNCOV
160
                errmsg = str(underlying)
×
UNCOV
161
            if contains not in errmsg:
×
162
                diff = "\n".join(
×
163
                    difflib.Differ().compare(contains.splitlines(), errmsg.splitlines())
164
                )
165
                raise AssertionError(
×
166
                    softwrap(
167
                        f"""
168
                        Expected value not found in exception.
169

170
                        => Expected: {contains}
171

172
                        => Actual: {errmsg}
173

174
                        => Diff: {diff}
175
                        """
176
                    )
177
                )
178
    else:
UNCOV
179
        raise AssertionError(
×
180
            softwrap(
181
                f"""
182
                DID NOT RAISE ExecutionError with underlying exception type
183
                {expected_underlying_exception}.
184
                """
185
            )
186
        )
187

188

189
def remove_locations_from_traceback(trace: str) -> str:
2✔
190
    location_pattern = re.compile(r'"/.*", line \d+')
×
191
    address_pattern = re.compile(r"0x[0-9a-f]+")
×
192
    new_trace = location_pattern.sub("LOCATION-INFO", trace)
×
193
    new_trace = address_pattern.sub("0xEEEEEEEEE", new_trace)
×
194
    return new_trace
×
195

196

197
# -----------------------------------------------------------------------------------------------
198
# `RuleRunner`
199
# -----------------------------------------------------------------------------------------------
200

201

202
_I = TypeVar("_I")
2✔
203
_O = TypeVar("_O")
2✔
204

205

206
# A global executor for Schedulers created in unit tests, which is shutdown using `atexit`. This
207
# allows for reusing threads, and avoids waiting for straggling tasks during teardown of each test.
208
EXECUTOR = PyExecutor(
2✔
209
    # Use the ~minimum possible parallelism since integration tests using RuleRunner will already
210
    # be run by Pants using an appropriate Parallelism. We must set max_threads > core_threads; so
211
    # 2 is the minimum, but, via trial and error, 3 minimizes test times on average.
212
    core_threads=1,
213
    max_threads=3,
214
)
215
atexit.register(lambda: EXECUTOR.shutdown(5))
2✔
216

217

218
# Environment variable names required for locating Python interpreters, for use with RuleRunner's
219
# env_inherit arguments.
220
# TODO: This is verbose and redundant: see https://github.com/pantsbuild/pants/issues/13350.
221
PYTHON_BOOTSTRAP_ENV = {"PATH", "PYENV_ROOT", "HOME"}
2✔
222

223

224
@dataclass(frozen=True)
2✔
225
class GoalRuleResult:
2✔
226
    exit_code: int
2✔
227
    stdout: str
2✔
228
    stderr: str
2✔
229

230
    @staticmethod
2✔
231
    def noop() -> GoalRuleResult:
2✔
UNCOV
232
        return GoalRuleResult(0, stdout="", stderr="")
×
233

234

235
# This is not frozen because we need to update the `scheduler` when setting options.
236
@dataclass
2✔
237
class RuleRunner:
2✔
238
    build_root: str
2✔
239
    options_bootstrapper: OptionsBootstrapper
2✔
240
    extra_session_values: dict[Any, Any]
2✔
241
    max_workunit_verbosity: LogLevel
2✔
242
    build_config: BuildConfiguration
2✔
243
    scheduler: SchedulerSession
2✔
244
    rules: tuple[Any, ...]
2✔
245

246
    def __init__(
2✔
247
        self,
248
        *,
249
        rules: Iterable | None = None,
250
        target_types: Iterable[type[Target]] | None = None,
251
        objects: dict[str, Any] | None = None,
252
        aliases: Iterable[BuildFileAliases] | None = None,
253
        context_aware_object_factories: dict[str, Any] | None = None,
254
        isolated_local_store: bool = False,
255
        preserve_tmpdirs: bool = False,
256
        ca_certs_path: str | None = None,
257
        bootstrap_args: Iterable[str] = (),
258
        extra_session_values: dict[Any, Any] | None = None,
259
        max_workunit_verbosity: LogLevel = LogLevel.DEBUG,
260
        inherent_environment: EnvironmentName | None = EnvironmentName(None),
261
        is_bootstrap: bool = False,
262
        auxiliary_goals: Iterable[type[AuxiliaryGoal]] | None = None,
263
    ) -> None:
264
        bootstrap_args = [*bootstrap_args]
2✔
265

266
        root_dir: Path | None = None
2✔
267
        if preserve_tmpdirs:
2✔
UNCOV
268
            root_dir = Path(mkdtemp(prefix="RuleRunner."))
×
UNCOV
269
            print(f"Preserving rule runner temporary directories at {root_dir}.", file=sys.stderr)
×
UNCOV
270
            bootstrap_args.extend(
×
271
                ["--keep-sandboxes=always", f"--local-execution-root-dir={root_dir}"]
272
            )
UNCOV
273
            build_root = (root_dir / "BUILD_ROOT").resolve()
×
UNCOV
274
            build_root.mkdir()
×
UNCOV
275
            self.build_root = str(build_root)
×
276
        else:
277
            self.build_root = os.path.realpath(safe_mkdtemp(prefix="_BUILD_ROOT"))
2✔
278

279
        safe_mkdir(self.pants_workdir)
2✔
280
        BuildRoot().path = self.build_root
2✔
281

282
        def rewrite_rule_for_inherent_environment(rule):
2✔
283
            if not inherent_environment or not isinstance(rule, QueryRule):
2✔
284
                return rule
2✔
285
            return QueryRule(rule.output_type, OrderedSet((*rule.input_types, EnvironmentName)))
2✔
286

287
        # TODO: Redesign rule registration for tests to be more ergonomic and to make this less
288
        #  special-cased.
289
        self.rules = tuple(rewrite_rule_for_inherent_environment(rule) for rule in (rules or ()))
2✔
290
        all_rules = (
2✔
291
            *self.rules,
292
            *build_configuration.rules(),
293
            *source_root.rules(),
294
            *options_parsing.rules(),
295
            *misc.rules(),
296
            *adhoc_binaries.rules(),
297
            # Many tests indirectly rely on this rule.
298
            generate_run_request,
299
            QueryRule(WrappedTarget, [WrappedTargetRequest]),
300
            QueryRule(AllTargets, []),
301
            QueryRule(UnionMembership, []),
302
        )
303
        build_config_builder = BuildConfiguration.Builder()
2✔
304
        build_config_builder.register_aliases(
2✔
305
            BuildFileAliases(
306
                objects=objects, context_aware_object_factories=context_aware_object_factories
307
            )
308
        )
309
        aliases = aliases or ()
2✔
310
        for build_file_aliases in aliases:
2✔
311
            build_config_builder.register_aliases(build_file_aliases)
×
312

313
        build_config_builder.register_rules("_dummy_for_test_", all_rules)
2✔
314
        build_config_builder.register_target_types("_dummy_for_test_", target_types or ())
2✔
315
        build_config_builder.register_auxiliary_goals("_dummy_for_test_", auxiliary_goals or ())
2✔
316
        self.build_config = build_config_builder.create()
2✔
317

318
        self.environment = CompleteEnvironmentVars({})
2✔
319
        self.extra_session_values = extra_session_values or {}
2✔
320
        self.inherent_environment = inherent_environment
2✔
321
        self.max_workunit_verbosity = max_workunit_verbosity
2✔
322

323
        # Change cwd and add sentinel file (BUILDROOT) so NativeOptionParser can find build_root.
324
        with self.pushd():
2✔
325
            Path("BUILDROOT").touch()
2✔
326
            self.options_bootstrapper = self.create_options_bootstrapper(
2✔
327
                args=bootstrap_args, env=None
328
            )
329
            options = self.options_bootstrapper.full_options(
2✔
330
                known_scope_infos=self.build_config.known_scope_infos,
331
                union_membership=UnionMembership.from_rules(
332
                    rule for rule in self.rules if isinstance(rule, UnionRule)
333
                ),
334
                allow_unknown_options=self.build_config.allow_unknown_options,
335
            )
336
            global_options = self.options_bootstrapper.bootstrap_options.for_global_scope()
2✔
337

338
        dynamic_remote_options, _ = DynamicRemoteOptions.from_options(options, self.environment)
2✔
339
        local_store_options = LocalStoreOptions.from_options(global_options)
2✔
340
        if isolated_local_store:
2✔
341
            if root_dir:
×
342
                lmdb_store_dir = root_dir / "lmdb_store"
×
343
                lmdb_store_dir.mkdir()
×
344
                store_dir = str(lmdb_store_dir)
×
345
            else:
346
                store_dir = safe_mkdtemp(prefix="lmdb_store.")
×
347
            local_store_options = dataclasses.replace(local_store_options, store_dir=store_dir)
×
348

349
        local_execution_root_dir = global_options.local_execution_root_dir
2✔
350
        named_caches_dir = global_options.named_caches_dir
2✔
351

352
        self._set_new_session(
2✔
353
            EngineInitializer.setup_graph_extended(
354
                pants_ignore_patterns=GlobalOptions.compute_pants_ignore(
355
                    self.build_root, global_options
356
                ),
357
                use_gitignore=False,
358
                local_store_options=local_store_options,
359
                local_execution_root_dir=local_execution_root_dir,
360
                named_caches_dir=named_caches_dir,
361
                pants_workdir=self.pants_workdir,
362
                build_root=self.build_root,
363
                build_configuration=self.build_config,
364
                # Each Scheduler that is created borrows the global executor, which is shut down `atexit`.
365
                executor=EXECUTOR.to_borrowed(),
366
                execution_options=ExecutionOptions.from_options(
367
                    global_options, dynamic_remote_options
368
                ),
369
                ca_certs_path=ca_certs_path,
370
                engine_visualize_to=None,
371
                is_bootstrap=is_bootstrap,
372
            ).scheduler
373
        )
374

375
    def __repr__(self) -> str:
2✔
UNCOV
376
        return f"RuleRunner(build_root={self.build_root})"
×
377

378
    def _set_new_session(self, scheduler: Scheduler) -> None:
2✔
379
        self.scheduler = scheduler.new_session(
2✔
380
            build_id="buildid_for_test",
381
            session_values=SessionValues(
382
                {
383
                    OptionsBootstrapper: self.options_bootstrapper,
384
                    CompleteEnvironmentVars: self.environment,
385
                    CurrentExecutingGoals: CurrentExecutingGoals(),
386
                    **self.extra_session_values,
387
                }
388
            ),
389
            max_workunit_level=self.max_workunit_verbosity,
390
        )
391

392
    @property
2✔
393
    def pants_workdir(self) -> str:
2✔
394
        return os.path.join(self.build_root, ".pants.d", "workdir")
2✔
395

396
    @property
2✔
397
    def target_types(self) -> tuple[type[Target], ...]:
2✔
398
        return self.build_config.target_types
×
399

400
    @property
2✔
401
    def union_membership(self) -> UnionMembership:
2✔
402
        """An instance of `UnionMembership` with all the test's registered `UnionRule`s."""
403
        return self.request(UnionMembership, [])
2✔
404

405
    def new_session(self, build_id: str) -> None:
2✔
406
        """Mutates this RuleRunner to begin a new Session with the same Scheduler."""
UNCOV
407
        self.scheduler = self.scheduler.scheduler.new_session(build_id)
×
408

409
    def request(self, output_type: type[_O], inputs: Iterable[Any]) -> _O:
2✔
410
        params = (
2✔
411
            Params(*inputs, self.inherent_environment)
412
            if self.inherent_environment
413
            else Params(*inputs)
414
        )
415
        with self.pushd():
2✔
416
            result = assert_single_element(self.scheduler.product_request(output_type, params))
2✔
417
        return cast(_O, result)
2✔
418

419
    def run_goal_rule(
2✔
420
        self,
421
        goal: type[Goal],
422
        *,
423
        global_args: Iterable[str] | None = None,
424
        args: Iterable[str] | None = None,
425
        env: Mapping[str, str] | None = None,
426
        env_inherit: set[str] | None = None,
427
    ) -> GoalRuleResult:
428
        merged_args = (*(global_args or []), goal.name, *(args or []))
2✔
429
        self.set_options(merged_args, env=env, env_inherit=env_inherit)
2✔
430

431
        with self.pushd():
2✔
432
            raw_specs = self.options_bootstrapper.full_options_for_scopes(
2✔
433
                [GlobalOptions.get_scope_info(), goal.subsystem_cls.get_scope_info()],
434
                self.union_membership,
435
            ).specs
436
        specs = SpecsParser(root_dir=self.build_root).parse_specs(
2✔
437
            raw_specs, description_of_origin="RuleRunner.run_goal_rule()"
438
        )
439

440
        stdout, stderr = StringIO(), StringIO()
2✔
441
        console = Console(stdout=stdout, stderr=stderr, use_colors=False, session=self.scheduler)
2✔
442

443
        with self.pushd():
2✔
444
            exit_code = self.scheduler.run_goal_rule(
2✔
445
                goal,
446
                Params(
447
                    specs,
448
                    console,
449
                    Workspace(self.scheduler),
450
                    *([self.inherent_environment] if self.inherent_environment else []),
451
                ),
452
            )
453

454
        console.flush()
2✔
455
        return GoalRuleResult(exit_code, stdout.getvalue(), stderr.getvalue())
2✔
456

457
    @contextmanager
2✔
458
    def pushd(self):
2✔
459
        with pushd(self.build_root):
2✔
460
            yield
2✔
461

462
    def create_options_bootstrapper(
2✔
463
        self, args: Iterable[str], env: Mapping[str, str] | None
464
    ) -> OptionsBootstrapper:
465
        return create_options_bootstrapper(args=args, env=env)
2✔
466

467
    def set_options(
2✔
468
        self,
469
        args: Iterable[str],
470
        *,
471
        env: Mapping[str, str] | None = None,
472
        env_inherit: set[str] | None = None,
473
    ) -> None:
474
        """Update the engine session with new options and/or environment variables.
475

476
        The environment variables will be used to set the `CompleteEnvironmentVars`, which is the
477
        environment variables captured by the parent Pants process. Some rules use this to be able
478
        to read arbitrary env vars. Any options that start with `PANTS_` will also be used to set
479
        options.
480

481
        Environment variables listed in `env_inherit` and not in `env` will be inherited from the test
482
        runner's environment (os.environ)
483

484
        This will override any previously configured values.
485
        """
486
        env = {
2✔
487
            **{k: os.environ[k] for k in (env_inherit or set()) if k in os.environ},
488
            **(env or {}),
489
        }
490
        with self.pushd():
2✔
491
            self.options_bootstrapper = self.create_options_bootstrapper(args=args, env=env)
2✔
492
        self.environment = CompleteEnvironmentVars(env)
2✔
493
        self._set_new_session(self.scheduler.scheduler)
2✔
494

495
    def set_session_values(
2✔
496
        self,
497
        extra_session_values: dict[Any, Any],
498
    ) -> None:
499
        """Update the engine Session with new session_values."""
UNCOV
500
        self.extra_session_values = extra_session_values
×
UNCOV
501
        self._set_new_session(self.scheduler.scheduler)
×
502

503
    def _invalidate_for(self, *relpaths: str):
2✔
504
        """Invalidates all files from the relpath, recursively up to the root.
505

506
        Many python operations implicitly create parent directories, so we assume that touching a
507
        file located below directories that do not currently exist will result in their creation.
508
        """
509
        files = {f for relpath in relpaths for f in recursive_dirname(relpath)}
2✔
510
        return self.scheduler.invalidate_files(files)
2✔
511

512
    def chmod(self, relpath: str | PurePath, mode: int) -> None:
2✔
513
        """Change the file mode and permissions.
514

515
        relpath: The relative path to the file or directory from the build root.
516
        mode: The file mode to set, preferable in octal representation, e.g. `mode=0o750`.
517
        """
518
        Path(self.build_root, relpath).chmod(mode)
×
519
        self._invalidate_for(str(relpath))
×
520

521
    def create_dir(self, relpath: str) -> str:
2✔
522
        """Creates a directory under the buildroot.
523

524
        :API: public
525

526
        relpath: The relative path to the directory from the build root.
527
        """
528
        path = os.path.join(self.build_root, relpath)
×
529
        safe_mkdir(path)
×
530
        self._invalidate_for(relpath)
×
531
        return path
×
532

533
    def _create_file(
2✔
534
        self, relpath: str | PurePath, contents: bytes | str = "", mode: str = "w"
535
    ) -> str:
536
        """Writes to a file under the buildroot.
537

538
        relpath: The relative path to the file from the build root.
539
        contents: A string containing the contents of the file - '' by default..
540
        mode: The mode to write to the file in - over-write by default.
541
        """
542
        path = os.path.join(self.build_root, relpath)
2✔
543
        with safe_open(path, mode=mode) as fp:
2✔
544
            fp.write(contents)
2✔
545
        self._invalidate_for(str(relpath))
2✔
546
        return path
2✔
547

548
    @overload
549
    def write_files(self, files: Mapping[str, str | bytes]) -> tuple[str, ...]: ...
550

551
    @overload
552
    def write_files(self, files: Mapping[PurePath, str | bytes]) -> tuple[str, ...]: ...
553

554
    def write_files(
2✔
555
        self, files: Mapping[PurePath, str | bytes] | Mapping[str, str | bytes]
556
    ) -> tuple[str, ...]:
557
        """Write the files to the build root.
558

559
        :API: public
560

561
        files: A mapping of file names to contents.
562
        returns: A tuple of absolute file paths created.
563
        """
564
        paths = []
2✔
565
        for path, content in files.items():
2✔
566
            paths.append(
2✔
567
                self._create_file(path, content, mode="wb" if isinstance(content, bytes) else "w")
568
            )
569
        return tuple(paths)
2✔
570

571
    def read_file(self, file: str | PurePath, mode: str = "r") -> str | bytes:
2✔
572
        """Read a file that was written to the build root, useful for testing."""
573
        path = os.path.join(self.build_root, file)
×
574
        with safe_open(path, mode=mode) as fp:
×
575
            if "b" in mode:
×
576
                return bytes(fp.read())
×
577
            return str(fp.read())
×
578

579
    def make_snapshot(self, files: Mapping[str, str | bytes]) -> Snapshot:
2✔
580
        """Makes a snapshot from a map of file name to file content.
581

582
        :API: public
583
        """
584
        file_contents = [
2✔
585
            FileContent(path, content.encode() if isinstance(content, str) else content)
586
            for path, content in files.items()
587
        ]
588
        digest = self.request(Digest, [CreateDigest(file_contents)])
2✔
589
        return self.request(Snapshot, [digest])
2✔
590

591
    def make_snapshot_of_empty_files(self, files: Iterable[str]) -> Snapshot:
2✔
592
        """Makes a snapshot with empty content for each file.
593

594
        This is a convenience around `TestBase.make_snapshot`, which allows specifying the content
595
        for each file.
596

597
        :API: public
598
        """
UNCOV
599
        return self.make_snapshot(dict.fromkeys(files, ""))
×
600

601
    def get_target(self, address: Address) -> Target:
2✔
602
        """Find the target for a given address.
603

604
        This requires that the target actually exists, i.e. that you set up its BUILD file.
605

606
        :API: public
607
        """
608
        return self.request(
2✔
609
            WrappedTarget,
610
            [WrappedTargetRequest(address, description_of_origin="RuleRunner.get_target()")],
611
        ).target
612

613
    def write_digest(
2✔
614
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
615
    ) -> None:
616
        """Write a digest to disk, relative to the test's build root.
617

618
        Access the written files by using `os.path.join(rule_runner.build_root, <relpath>)`.
619
        """
620
        native_engine.write_digest(
2✔
621
            self.scheduler.py_scheduler,
622
            self.scheduler.py_session,
623
            digest,
624
            path_prefix or "",
625
            clear_paths,
626
        )
627

628
    def run_interactive_process(self, request: InteractiveProcess) -> InteractiveProcessResult:
2✔
629
        with self.pushd():
2✔
630
            return native_engine.session_run_interactive_process(
2✔
631
                self.scheduler.py_session,
632
                request,
633
                ProcessExecutionEnvironment(
634
                    environment_name=None,
635
                    platform=Platform.create_for_localhost().value,
636
                    docker_image=None,
637
                    remote_execution=False,
638
                    remote_execution_extra_platform_properties=[],
639
                    execute_in_workspace=False,
640
                    keep_sandboxes="never",
641
                ),
642
            )
643

644

645
# -----------------------------------------------------------------------------------------------
646
# `run_rule_with_mocks()`
647
# -----------------------------------------------------------------------------------------------
648

649

650
def run_rule_with_mocks(
2✔
651
    rule: Callable[..., Coroutine[Any, Any, _O]],
652
    *,
653
    rule_args: Sequence[Any] = (),
654
    mock_calls: Mapping[str, Callable] | None = None,
655
    union_membership: UnionMembership | None = None,
656
    show_warnings: bool = True,
657
) -> _O:
658
    """A test helper that runs an @rule with a set of args and mocked underlying @rule invocations.
659

660
    An @rule named `my_rule` that takes one argument and invokes no other @rules (by-name  or via
661
    `Get` requests) can be invoked like so:
662

663
    ```
664
    return_value = run_rule_with_mocks(my_rule, rule_args=[arg1])
665
    ```
666

667
    In the case of an @rule that invokes other @rules, either by name or via `Get` requests,
668
    the `mock_calls` argument must be provided.
669

670
    `mock_calls` is a mapping of fully-qualified rule name to the function that mocks that rule,
671
    and mocks out calls by name to the corresponding rules.
672

673
    So in the case of an @rule named `my_co_rule` that takes one argument and calls the @rule
674
    `path.to.module.list_dir` by name to produce a `Listing` from a `Dir`, the invoke might look
675
    like:
676

677
    ```
678
    return_value = run_rule_with_mocks(
679
      my_co_rule,
680
      rule_args=[arg1],
681
      mock_calls={
682
        "path.to.module.list_dir": lambda dir_subject: Listing(..),
683
      },
684
    )
685
    ```
686

687
    :returns: The return value of the completed @rule.
688
    """
689
    mock_calls = mock_calls or {}
2✔
690

691
    task_rule = getattr(rule, "rule", None)
2✔
692

693
    func: Callable[..., Coroutine[Any, Any, _O]] | Callable[..., _O]
694

695
    # Perform additional validation on `@rule` that the correct args are provided. We don't have
696
    # an easy way to do this for async helper calls yet.
697
    if task_rule:
2✔
698
        if len(rule_args) != len(task_rule.parameters):
2✔
699
            raise ValueError(
×
700
                "Error running rule with mocks:\n"
701
                f"Rule {task_rule.func.__qualname__} expected to receive arguments of the "
702
                f"form: {task_rule.parameters}; got: {rule_args}"
703
            )
704

705
        # Access the original function, rather than the trampoline that we would get by calling
706
        # it directly.
707
        func = task_rule.func
2✔
708
    else:
UNCOV
709
        func = rule
×
710

711
    res = func(*(rule_args or ()))
2✔
712
    if not isinstance(res, (Coroutine, Generator)):
2✔
713
        return res
×
714

715
    unconsumed_mock_calls = set(mock_calls.keys())
2✔
716

717
    def get(res: Any):
2✔
718
        if not isinstance(res, Call):
2✔
719
            raise AssertionError(f"Bad arg type: {res}")
×
720
        mock_call = mock_calls.get(res.rule_id)
2✔
721
        if mock_call is None:
2✔
722
            raise AssertionError(f"No mock_call provided for {res.rule_id}.")
×
723
        unconsumed_mock_calls.discard(res.rule_id)
2✔
724
        # NB: if the mock declares an `__implicitly` parameter, forward the raw `(dict,)` so it
725
        # can inspect declared types (e.g. to route polymorphic dispatch); otherwise unpack the
726
        # implicit values positionally.
727
        implicit = res.implicit_args
2✔
728
        if implicit and "__implicitly" in inspect.signature(mock_call).parameters:
2✔
UNCOV
729
            return mock_call(*res.args, __implicitly=(implicit,))
×
730
        return mock_call(*res.args, *implicit)
2✔
731

732
    rule_coroutine = res
2✔
733
    rule_input = None
2✔
734

735
    def warn_on_unconsumed_mocks():
2✔
736
        # Note that we used `warnings` instead of `logger.warning` because the latter may
737
        # get captured or swallowed by the test framework.
738
        if show_warnings:
2✔
739
            if unconsumed_mock_calls:
2✔
740
                warnings.warn(f"Unconsumed mock_calls: {unconsumed_mock_calls}")
2✔
741

742
    while True:
2✔
743
        try:
2✔
744
            res = rule_coroutine.send(rule_input)
2✔
745
            if isinstance(res, Call):
2✔
746
                rule_input = get(res)
2✔
UNCOV
747
            elif isinstance(res, _Concurrently):
×
UNCOV
748
                rule_input = [get(g) for g in res.calls]
×
749
            elif type(res) in (tuple, list):
×
750
                rule_input = [get(g) for g in res]
×
751
            else:
752
                warn_on_unconsumed_mocks()
×
753
                return res  # type: ignore[no-any-return]
×
754
        except StopIteration as e:
2✔
755
            warn_on_unconsumed_mocks()
2✔
756
            return e.value  # type: ignore[no-any-return]
2✔
757

758

759
@contextmanager
2✔
760
def stdin_context(content: bytes | str | None = None):
2✔
761
    if content is None:
2✔
762
        yield open("/dev/null")
2✔
763
    else:
764
        with temporary_file(binary_mode=isinstance(content, bytes)) as stdin_file:
×
765
            stdin_file.write(content)
×
766
            stdin_file.close()
×
767
            yield open(stdin_file.name)
×
768

769

770
@contextmanager
2✔
771
def mock_console(
2✔
772
    options_bootstrapper: OptionsBootstrapper,
773
    *,
774
    stdin_content: bytes | str | None = None,
775
) -> Iterator[tuple[Console, StdioReader]]:
776
    with pushd(get_buildroot()):
2✔
777
        global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope()
2✔
778
        colors = (
2✔
779
            options_bootstrapper.full_options_for_scopes(
780
                [GlobalOptions.get_scope_info()],
781
                UnionMembership.empty(),
782
                allow_unknown_options=True,
783
            )
784
            .for_global_scope()
785
            .colors
786
        )
787

788
    with (
2✔
789
        initialize_stdio(global_bootstrap_options),
790
        stdin_context(stdin_content) as stdin,
791
        temporary_file(binary_mode=False) as stdout,
792
        temporary_file(binary_mode=False) as stderr,
793
        stdio_destination(
794
            stdin_fileno=stdin.fileno(),
795
            stdout_fileno=stdout.fileno(),
796
            stderr_fileno=stderr.fileno(),
797
        ),
798
    ):
799
        # NB: We yield a Console without overriding the destination argument, because we have
800
        # already done a sys.std* level replacement. The replacement is necessary in order for
801
        # InteractiveProcess to have native file handles to interact with.
802
        yield (
2✔
803
            Console(use_colors=colors),
804
            StdioReader(_stdout=Path(stdout.name), _stderr=Path(stderr.name)),
805
        )
806

807

808
@dataclass
2✔
809
class StdioReader:
2✔
810
    _stdout: Path
2✔
811
    _stderr: Path
2✔
812

813
    def get_stdout(self) -> str:
2✔
814
        """Return all data that has been flushed to stdout so far."""
815
        return self._stdout.read_text()
2✔
816

817
    def get_stderr(self) -> str:
2✔
818
        """Return all data that has been flushed to stderr so far."""
UNCOV
819
        return self._stderr.read_text()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc