• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19529437518

20 Nov 2025 07:44AM UTC coverage: 78.884% (-1.4%) from 80.302%
19529437518

push

github

web-flow
nfpm.native_libs: Add RPM package depends from packaged pex_binaries (#22899)

## PR Series Overview

This is the second in a series of PRs that introduces a new backend:
`pants.backend.npm.native_libs`
Initially, the backend will be available as:
`pants.backend.experimental.nfpm.native_libs`

I proposed this new backend (originally named `bindeps`) in discussion
#22396.

This backend will inspect ELF bin/lib files (like `lib*.so`) in packaged
contents (for this PR series, only in `pex_binary` targets) to identify
package dependency metadata and inject that metadata on the relevant
`nfpm_deb_package` or `nfpm_rpm_package` targets. Effectively, it will
provide an approximation of these native packager features:
- `rpm`: `rpmdeps` + `elfdeps`
- `deb`: `dh_shlibdeps` + `dpkg-shlibdeps` (These substitute
`${shlibs:Depends}` in debian control files have)

### Goal: Host-agnostic package builds

This pants backend is designed to be host-agnostic, like
[nFPM](https://nfpm.goreleaser.com/).

Native packaging tools are often restricted to a single release of a
single distro. Unlike native package builders, this new pants backend
does not use any of those distro-specific or distro-release-specific
utilities or local package databases. This new backend should be able to
run (help with building deb and rpm packages) anywhere that pants can
run (MacOS, rpm linux distros, deb linux distros, other linux distros,
docker, ...).

### Previous PRs in series

- #22873

## PR Overview

This PR adds rules in `nfpm.native_libs` to add package dependency
metadata to `nfpm_rpm_package`. The 2 new rules are:

- `inject_native_libs_dependencies_in_package_fields`:

    - An implementation of the polymorphic rule `inject_nfpm_package_fields`.
      This rule is low priority (`priority = 2`) so that in-repo plugins can
      override/augment what it injects. (See #22864)

    - Rule logic overview:
        - find any pex_binaries that will be packaged in an `nfpm_rpm_package`
   ... (continued)

96 of 118 new or added lines in 3 files covered. (81.36%)

910 existing lines in 53 files now uncovered.

73897 of 93678 relevant lines covered (78.88%)

3.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.38
/src/python/pants/testutil/rule_runner.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
11✔
5

6
import atexit
11✔
7
import dataclasses
11✔
8
import difflib
11✔
9
import functools
11✔
10
import inspect
11✔
11
import os
11✔
12
import re
11✔
13
import sys
11✔
14
import warnings
11✔
15
from collections.abc import Callable, Coroutine, Generator, Iterable, Iterator, Mapping, Sequence
11✔
16
from contextlib import contextmanager
11✔
17
from dataclasses import dataclass
11✔
18
from io import StringIO
11✔
19
from pathlib import Path, PurePath
11✔
20
from tempfile import mkdtemp
11✔
21
from typing import Any, Generic, TypeVar, cast, overload
11✔
22

23
from pants.base.build_environment import get_buildroot
11✔
24
from pants.base.build_root import BuildRoot
11✔
25
from pants.base.specs_parser import SpecsParser
11✔
26
from pants.build_graph import build_configuration
11✔
27
from pants.build_graph.build_configuration import BuildConfiguration
11✔
28
from pants.build_graph.build_file_aliases import BuildFileAliases
11✔
29
from pants.core.goals.run import generate_run_request
11✔
30
from pants.core.util_rules import adhoc_binaries, misc
11✔
31
from pants.engine.addresses import Address
11✔
32
from pants.engine.console import Console
11✔
33
from pants.engine.env_vars import CompleteEnvironmentVars
11✔
34
from pants.engine.environment import EnvironmentName
11✔
35
from pants.engine.fs import CreateDigest, Digest, FileContent, Snapshot, Workspace
11✔
36
from pants.engine.goal import CurrentExecutingGoals, Goal
11✔
37
from pants.engine.internals import native_engine, options_parsing
11✔
38
from pants.engine.internals.native_engine import ProcessExecutionEnvironment, PyExecutor
11✔
39
from pants.engine.internals.scheduler import ExecutionError, Scheduler, SchedulerSession
11✔
40
from pants.engine.internals.selectors import Call, Effect, Get, Params
11✔
41
from pants.engine.internals.session import SessionValues
11✔
42
from pants.engine.platform import Platform
11✔
43
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
11✔
44
from pants.engine.rules import QueryRule as QueryRule
11✔
45
from pants.engine.target import AllTargets, Target, WrappedTarget, WrappedTargetRequest
11✔
46
from pants.engine.unions import UnionMembership, UnionRule
11✔
47
from pants.goal.auxiliary_goal import AuxiliaryGoal
11✔
48
from pants.init.engine_initializer import EngineInitializer
11✔
49
from pants.init.logging import initialize_stdio, initialize_stdio_raw, stdio_destination
11✔
50
from pants.option.bootstrap_options import DynamicRemoteOptions, ExecutionOptions, LocalStoreOptions
11✔
51
from pants.option.global_options import GlobalOptions
11✔
52
from pants.option.options_bootstrapper import OptionsBootstrapper
11✔
53
from pants.source import source_root
11✔
54
from pants.testutil.option_util import create_options_bootstrapper
11✔
55
from pants.util.collections import assert_single_element
11✔
56
from pants.util.contextutil import pushd, temporary_dir, temporary_file
11✔
57
from pants.util.dirutil import recursive_dirname, safe_mkdir, safe_mkdtemp, safe_open
11✔
58
from pants.util.logging import LogLevel
11✔
59
from pants.util.ordered_set import OrderedSet
11✔
60
from pants.util.strutil import softwrap
11✔
61

62

63
def logging(original_function=None, *, level: LogLevel = LogLevel.INFO):
11✔
64
    """A decorator that enables logging (optionally at the given level).
65

66
    May be used without a parameter list:
67

68
        ```
69
        @logging
70
        def test_function():
71
            ...
72
        ```
73

74
    ...or with a level argument:
75

76
        ```
77
        @logging(level=LogLevel.DEBUG)
78
        def test_function():
79
            ...
80
        ```
81
    """
82

83
    def _decorate(func):
8✔
84
        @functools.wraps(func)
8✔
85
        def wrapper(*args, **kwargs):
8✔
86
            stdout_fileno, stderr_fileno = sys.stdout.fileno(), sys.stderr.fileno()
8✔
87
            with (
8✔
88
                temporary_dir() as tempdir,
89
                initialize_stdio_raw(level, False, False, {}, True, [], tempdir),
90
                stdin_context() as stdin,
91
                stdio_destination(stdin.fileno(), stdout_fileno, stderr_fileno),
92
            ):
93
                return func(*args, **kwargs)
8✔
94

95
        return wrapper
8✔
96

97
    if original_function:
8✔
98
        return _decorate(original_function)
8✔
UNCOV
99
    return _decorate
×
100

101

102
@contextmanager
11✔
103
def engine_error(
11✔
104
    expected_underlying_exception: type[Exception] = Exception,
105
    *,
106
    contains: str | None = None,
107
    normalize_tracebacks: bool = False,
108
) -> Iterator[None]:
109
    """A context manager to catch `ExecutionError`s in tests and check that the underlying exception
110
    is expected.
111

112
    Use like this:
113

114
        with engine_error(ValueError, contains="foo"):
115
            rule_runner.request(OutputType, [input])
116

117
    Will raise AssertionError if no ExecutionError occurred.
118

119
    Set `normalize_tracebacks=True` to replace file locations and addresses in the error message
120
    with fixed values for testability, and check `contains` against the `ExecutionError` message
121
    instead of the underlying error only.
122
    """
123
    try:
9✔
124
        yield
9✔
125
    except ExecutionError as exec_error:
9✔
126
        if not len(exec_error.wrapped_exceptions) == 1:
9✔
127
            formatted_errors = "\n\n".join(repr(e) for e in exec_error.wrapped_exceptions)
×
128
            raise ValueError(
×
129
                softwrap(
130
                    f"""
131
                    Multiple underlying exceptions, but this helper function expected only one.
132
                    Use `with pytest.raises(ExecutionError) as exc` directly and inspect
133
                    `exc.value.wrapped_exceptions`.
134

135
                    Errors: {formatted_errors}
136
                    """
137
                )
138
            )
139
        underlying = exec_error.wrapped_exceptions[0]
9✔
140
        if not isinstance(underlying, expected_underlying_exception):
9✔
141
            raise AssertionError(
×
142
                softwrap(
143
                    f"""
144
                    ExecutionError occurred as expected, but the underlying exception had type
145
                    {type(underlying)} rather than the expected type
146
                    {expected_underlying_exception}:
147

148
                    {underlying}
149
                    """
150
                )
151
            )
152
        if contains is not None:
9✔
153
            if normalize_tracebacks:
9✔
154
                errmsg = remove_locations_from_traceback(str(exec_error))
1✔
155
            else:
156
                errmsg = str(underlying)
9✔
157
            if contains not in errmsg:
9✔
158
                diff = "\n".join(
×
159
                    difflib.Differ().compare(contains.splitlines(), errmsg.splitlines())
160
                )
161
                raise AssertionError(
×
162
                    softwrap(
163
                        f"""
164
                        Expected value not found in exception.
165

166
                        => Expected: {contains}
167

168
                        => Actual: {errmsg}
169

170
                        => Diff: {diff}
171
                        """
172
                    )
173
                )
174
    else:
175
        raise AssertionError(
1✔
176
            softwrap(
177
                f"""
178
                DID NOT RAISE ExecutionError with underlying exception type
179
                {expected_underlying_exception}.
180
                """
181
            )
182
        )
183

184

185
def remove_locations_from_traceback(trace: str) -> str:
11✔
186
    location_pattern = re.compile(r'"/.*", line \d+')
1✔
187
    address_pattern = re.compile(r"0x[0-9a-f]+")
1✔
188
    new_trace = location_pattern.sub("LOCATION-INFO", trace)
1✔
189
    new_trace = address_pattern.sub("0xEEEEEEEEE", new_trace)
1✔
190
    return new_trace
1✔
191

192

193
# -----------------------------------------------------------------------------------------------
194
# `RuleRunner`
195
# -----------------------------------------------------------------------------------------------
196

197

198
_I = TypeVar("_I")
11✔
199
_O = TypeVar("_O")
11✔
200

201

202
# A global executor for Schedulers created in unit tests, which is shutdown using `atexit`. This
203
# allows for reusing threads, and avoids waiting for straggling tasks during teardown of each test.
204
EXECUTOR = PyExecutor(
11✔
205
    # Use the ~minimum possible parallelism since integration tests using RuleRunner will already
206
    # be run by Pants using an appropriate Parallelism. We must set max_threads > core_threads; so
207
    # 2 is the minimum, but, via trial and error, 3 minimizes test times on average.
208
    core_threads=1,
209
    max_threads=3,
210
)
211
atexit.register(lambda: EXECUTOR.shutdown(5))
11✔
212

213

214
# Environment variable names required for locating Python interpreters, for use with RuleRunner's
215
# env_inherit arguments.
216
# TODO: This is verbose and redundant: see https://github.com/pantsbuild/pants/issues/13350.
217
PYTHON_BOOTSTRAP_ENV = {"PATH", "PYENV_ROOT", "HOME"}
11✔
218

219

220
@dataclass(frozen=True)
11✔
221
class GoalRuleResult:
11✔
222
    exit_code: int
11✔
223
    stdout: str
11✔
224
    stderr: str
11✔
225

226
    @staticmethod
11✔
227
    def noop() -> GoalRuleResult:
11✔
228
        return GoalRuleResult(0, stdout="", stderr="")
1✔
229

230

231
# This is not frozen because we need to update the `scheduler` when setting options.
232
@dataclass
11✔
233
class RuleRunner:
11✔
234
    build_root: str
11✔
235
    options_bootstrapper: OptionsBootstrapper
11✔
236
    extra_session_values: dict[Any, Any]
11✔
237
    max_workunit_verbosity: LogLevel
11✔
238
    build_config: BuildConfiguration
11✔
239
    scheduler: SchedulerSession
11✔
240
    rules: tuple[Any, ...]
11✔
241

242
    def __init__(
11✔
243
        self,
244
        *,
245
        rules: Iterable | None = None,
246
        target_types: Iterable[type[Target]] | None = None,
247
        objects: dict[str, Any] | None = None,
248
        aliases: Iterable[BuildFileAliases] | None = None,
249
        context_aware_object_factories: dict[str, Any] | None = None,
250
        isolated_local_store: bool = False,
251
        preserve_tmpdirs: bool = False,
252
        ca_certs_path: str | None = None,
253
        bootstrap_args: Iterable[str] = (),
254
        extra_session_values: dict[Any, Any] | None = None,
255
        max_workunit_verbosity: LogLevel = LogLevel.DEBUG,
256
        inherent_environment: EnvironmentName | None = EnvironmentName(None),
257
        is_bootstrap: bool = False,
258
        auxiliary_goals: Iterable[type[AuxiliaryGoal]] | None = None,
259
    ) -> None:
260
        bootstrap_args = [*bootstrap_args]
11✔
261

262
        root_dir: Path | None = None
11✔
263
        if preserve_tmpdirs:
11✔
264
            root_dir = Path(mkdtemp(prefix="RuleRunner."))
7✔
265
            print(f"Preserving rule runner temporary directories at {root_dir}.", file=sys.stderr)
7✔
266
            bootstrap_args.extend(
7✔
267
                ["--keep-sandboxes=always", f"--local-execution-root-dir={root_dir}"]
268
            )
269
            build_root = (root_dir / "BUILD_ROOT").resolve()
7✔
270
            build_root.mkdir()
7✔
271
            self.build_root = str(build_root)
7✔
272
        else:
273
            self.build_root = os.path.realpath(safe_mkdtemp(prefix="_BUILD_ROOT"))
11✔
274

275
        safe_mkdir(self.pants_workdir)
11✔
276
        BuildRoot().path = self.build_root
11✔
277

278
        def rewrite_rule_for_inherent_environment(rule):
11✔
279
            if not inherent_environment or not isinstance(rule, QueryRule):
11✔
280
                return rule
11✔
281
            return QueryRule(rule.output_type, OrderedSet((*rule.input_types, EnvironmentName)))
11✔
282

283
        # TODO: Redesign rule registration for tests to be more ergonomic and to make this less
284
        #  special-cased.
285
        self.rules = tuple(rewrite_rule_for_inherent_environment(rule) for rule in (rules or ()))
11✔
286
        all_rules = (
11✔
287
            *self.rules,
288
            *build_configuration.rules(),
289
            *source_root.rules(),
290
            *options_parsing.rules(),
291
            *misc.rules(),
292
            *adhoc_binaries.rules(),
293
            # Many tests indirectly rely on this rule.
294
            generate_run_request,
295
            QueryRule(WrappedTarget, [WrappedTargetRequest]),
296
            QueryRule(AllTargets, []),
297
            QueryRule(UnionMembership, []),
298
        )
299
        build_config_builder = BuildConfiguration.Builder()
11✔
300
        build_config_builder.register_aliases(
11✔
301
            BuildFileAliases(
302
                objects=objects, context_aware_object_factories=context_aware_object_factories
303
            )
304
        )
305
        aliases = aliases or ()
11✔
306
        for build_file_aliases in aliases:
11✔
307
            build_config_builder.register_aliases(build_file_aliases)
1✔
308

309
        build_config_builder.register_rules("_dummy_for_test_", all_rules)
11✔
310
        build_config_builder.register_target_types("_dummy_for_test_", target_types or ())
11✔
311
        build_config_builder.register_auxiliary_goals("_dummy_for_test_", auxiliary_goals or ())
11✔
312
        self.build_config = build_config_builder.create()
11✔
313

314
        self.environment = CompleteEnvironmentVars({})
11✔
315
        self.extra_session_values = extra_session_values or {}
11✔
316
        self.inherent_environment = inherent_environment
11✔
317
        self.max_workunit_verbosity = max_workunit_verbosity
11✔
318

319
        # Change cwd and add sentinel file (BUILDROOT) so NativeOptionParser can find build_root.
320
        with self.pushd():
11✔
321
            Path("BUILDROOT").touch()
11✔
322
            self.options_bootstrapper = self.create_options_bootstrapper(
11✔
323
                args=bootstrap_args, env=None
324
            )
325
            options = self.options_bootstrapper.full_options(
11✔
326
                known_scope_infos=self.build_config.known_scope_infos,
327
                union_membership=UnionMembership.from_rules(
328
                    rule for rule in self.rules if isinstance(rule, UnionRule)
329
                ),
330
                allow_unknown_options=self.build_config.allow_unknown_options,
331
            )
332
            global_options = self.options_bootstrapper.bootstrap_options.for_global_scope()
11✔
333

334
        dynamic_remote_options, _ = DynamicRemoteOptions.from_options(options, self.environment)
11✔
335
        local_store_options = LocalStoreOptions.from_options(global_options)
11✔
336
        if isolated_local_store:
11✔
337
            if root_dir:
4✔
338
                lmdb_store_dir = root_dir / "lmdb_store"
×
339
                lmdb_store_dir.mkdir()
×
340
                store_dir = str(lmdb_store_dir)
×
341
            else:
342
                store_dir = safe_mkdtemp(prefix="lmdb_store.")
4✔
343
            local_store_options = dataclasses.replace(local_store_options, store_dir=store_dir)
4✔
344

345
        local_execution_root_dir = global_options.local_execution_root_dir
11✔
346
        named_caches_dir = global_options.named_caches_dir
11✔
347

348
        self._set_new_session(
11✔
349
            EngineInitializer.setup_graph_extended(
350
                pants_ignore_patterns=GlobalOptions.compute_pants_ignore(
351
                    self.build_root, global_options
352
                ),
353
                use_gitignore=False,
354
                local_store_options=local_store_options,
355
                local_execution_root_dir=local_execution_root_dir,
356
                named_caches_dir=named_caches_dir,
357
                pants_workdir=self.pants_workdir,
358
                build_root=self.build_root,
359
                build_configuration=self.build_config,
360
                # Each Scheduler that is created borrows the global executor, which is shut down `atexit`.
361
                executor=EXECUTOR.to_borrowed(),
362
                execution_options=ExecutionOptions.from_options(
363
                    global_options, dynamic_remote_options
364
                ),
365
                ca_certs_path=ca_certs_path,
366
                engine_visualize_to=None,
367
                is_bootstrap=is_bootstrap,
368
            ).scheduler
369
        )
370

371
    def __repr__(self) -> str:
11✔
372
        return f"RuleRunner(build_root={self.build_root})"
1✔
373

374
    def _set_new_session(self, scheduler: Scheduler) -> None:
11✔
375
        self.scheduler = scheduler.new_session(
11✔
376
            build_id="buildid_for_test",
377
            session_values=SessionValues(
378
                {
379
                    OptionsBootstrapper: self.options_bootstrapper,
380
                    CompleteEnvironmentVars: self.environment,
381
                    CurrentExecutingGoals: CurrentExecutingGoals(),
382
                    **self.extra_session_values,
383
                }
384
            ),
385
            max_workunit_level=self.max_workunit_verbosity,
386
        )
387

388
    @property
11✔
389
    def pants_workdir(self) -> str:
11✔
390
        return os.path.join(self.build_root, ".pants.d", "workdir")
11✔
391

392
    @property
11✔
393
    def target_types(self) -> tuple[type[Target], ...]:
11✔
394
        return self.build_config.target_types
1✔
395

396
    @property
11✔
397
    def union_membership(self) -> UnionMembership:
11✔
398
        """An instance of `UnionMembership` with all the test's registered `UnionRule`s."""
399
        return self.request(UnionMembership, [])
11✔
400

401
    def new_session(self, build_id: str) -> None:
11✔
402
        """Mutates this RuleRunner to begin a new Session with the same Scheduler."""
403
        self.scheduler = self.scheduler.scheduler.new_session(build_id)
4✔
404

405
    def request(self, output_type: type[_O], inputs: Iterable[Any]) -> _O:
11✔
406
        params = (
11✔
407
            Params(*inputs, self.inherent_environment)
408
            if self.inherent_environment
409
            else Params(*inputs)
410
        )
411
        with self.pushd():
11✔
412
            result = assert_single_element(self.scheduler.product_request(output_type, params))
11✔
413
        return cast(_O, result)
11✔
414

415
    def run_goal_rule(
11✔
416
        self,
417
        goal: type[Goal],
418
        *,
419
        global_args: Iterable[str] | None = None,
420
        args: Iterable[str] | None = None,
421
        env: Mapping[str, str] | None = None,
422
        env_inherit: set[str] | None = None,
423
    ) -> GoalRuleResult:
424
        merged_args = (*(global_args or []), goal.name, *(args or []))
11✔
425
        self.set_options(merged_args, env=env, env_inherit=env_inherit)
11✔
426

427
        with self.pushd():
11✔
428
            raw_specs = self.options_bootstrapper.full_options_for_scopes(
11✔
429
                [GlobalOptions.get_scope_info(), goal.subsystem_cls.get_scope_info()],
430
                self.union_membership,
431
            ).specs
432
        specs = SpecsParser(root_dir=self.build_root).parse_specs(
11✔
433
            raw_specs, description_of_origin="RuleRunner.run_goal_rule()"
434
        )
435

436
        stdout, stderr = StringIO(), StringIO()
11✔
437
        console = Console(stdout=stdout, stderr=stderr, use_colors=False, session=self.scheduler)
11✔
438

439
        with self.pushd():
11✔
440
            exit_code = self.scheduler.run_goal_rule(
11✔
441
                goal,
442
                Params(
443
                    specs,
444
                    console,
445
                    Workspace(self.scheduler),
446
                    *([self.inherent_environment] if self.inherent_environment else []),
447
                ),
448
            )
449

450
        console.flush()
11✔
451
        return GoalRuleResult(exit_code, stdout.getvalue(), stderr.getvalue())
11✔
452

453
    @contextmanager
11✔
454
    def pushd(self):
11✔
455
        with pushd(self.build_root):
11✔
456
            yield
11✔
457

458
    def create_options_bootstrapper(
11✔
459
        self, args: Iterable[str], env: Mapping[str, str] | None
460
    ) -> OptionsBootstrapper:
461
        return create_options_bootstrapper(args=args, env=env)
11✔
462

463
    def set_options(
11✔
464
        self,
465
        args: Iterable[str],
466
        *,
467
        env: Mapping[str, str] | None = None,
468
        env_inherit: set[str] | None = None,
469
    ) -> None:
470
        """Update the engine session with new options and/or environment variables.
471

472
        The environment variables will be used to set the `CompleteEnvironmentVars`, which is the
473
        environment variables captured by the parent Pants process. Some rules use this to be able
474
        to read arbitrary env vars. Any options that start with `PANTS_` will also be used to set
475
        options.
476

477
        Environment variables listed in `env_inherit` and not in `env` will be inherited from the test
478
        runner's environment (os.environ)
479

480
        This will override any previously configured values.
481
        """
482
        env = {
11✔
483
            **{k: os.environ[k] for k in (env_inherit or set()) if k in os.environ},
484
            **(env or {}),
485
        }
486
        with self.pushd():
11✔
487
            self.options_bootstrapper = self.create_options_bootstrapper(args=args, env=env)
11✔
488
        self.environment = CompleteEnvironmentVars(env)
11✔
489
        self._set_new_session(self.scheduler.scheduler)
11✔
490

491
    def set_session_values(
11✔
492
        self,
493
        extra_session_values: dict[Any, Any],
494
    ) -> None:
495
        """Update the engine Session with new session_values."""
496
        self.extra_session_values = extra_session_values
3✔
497
        self._set_new_session(self.scheduler.scheduler)
3✔
498

499
    def _invalidate_for(self, *relpaths: str):
11✔
500
        """Invalidates all files from the relpath, recursively up to the root.
501

502
        Many python operations implicitly create parent directories, so we assume that touching a
503
        file located below directories that do not currently exist will result in their creation.
504
        """
505
        files = {f for relpath in relpaths for f in recursive_dirname(relpath)}
11✔
506
        return self.scheduler.invalidate_files(files)
11✔
507

508
    def chmod(self, relpath: str | PurePath, mode: int) -> None:
11✔
509
        """Change the file mode and permissions.
510

511
        relpath: The relative path to the file or directory from the build root.
512
        mode: The file mode to set, preferable in octal representation, e.g. `mode=0o750`.
513
        """
514
        Path(self.build_root, relpath).chmod(mode)
2✔
515
        self._invalidate_for(str(relpath))
2✔
516

517
    def create_dir(self, relpath: str) -> str:
11✔
518
        """Creates a directory under the buildroot.
519

520
        :API: public
521

522
        relpath: The relative path to the directory from the build root.
523
        """
524
        path = os.path.join(self.build_root, relpath)
3✔
525
        safe_mkdir(path)
3✔
526
        self._invalidate_for(relpath)
3✔
527
        return path
3✔
528

529
    def _create_file(
11✔
530
        self, relpath: str | PurePath, contents: bytes | str = "", mode: str = "w"
531
    ) -> str:
532
        """Writes to a file under the buildroot.
533

534
        relpath: The relative path to the file from the build root.
535
        contents: A string containing the contents of the file - '' by default..
536
        mode: The mode to write to the file in - over-write by default.
537
        """
538
        path = os.path.join(self.build_root, relpath)
11✔
539
        with safe_open(path, mode=mode) as fp:
11✔
540
            fp.write(contents)
11✔
541
        self._invalidate_for(str(relpath))
11✔
542
        return path
11✔
543

544
    @overload
545
    def write_files(self, files: Mapping[str, str | bytes]) -> tuple[str, ...]: ...
546

547
    @overload
548
    def write_files(self, files: Mapping[PurePath, str | bytes]) -> tuple[str, ...]: ...
549

550
    def write_files(
11✔
551
        self, files: Mapping[PurePath, str | bytes] | Mapping[str, str | bytes]
552
    ) -> tuple[str, ...]:
553
        """Write the files to the build root.
554

555
        :API: public
556

557
        files: A mapping of file names to contents.
558
        returns: A tuple of absolute file paths created.
559
        """
560
        paths = []
11✔
561
        for path, content in files.items():
11✔
562
            paths.append(
11✔
563
                self._create_file(path, content, mode="wb" if isinstance(content, bytes) else "w")
564
            )
565
        return tuple(paths)
11✔
566

567
    def read_file(self, file: str | PurePath, mode: str = "r") -> str | bytes:
11✔
568
        """Read a file that was written to the build root, useful for testing."""
569
        path = os.path.join(self.build_root, file)
2✔
570
        with safe_open(path, mode=mode) as fp:
2✔
571
            if "b" in mode:
2✔
572
                return bytes(fp.read())
×
573
            return str(fp.read())
2✔
574

575
    def make_snapshot(self, files: Mapping[str, str | bytes]) -> Snapshot:
11✔
576
        """Makes a snapshot from a map of file name to file content.
577

578
        :API: public
579
        """
580
        file_contents = [
11✔
581
            FileContent(path, content.encode() if isinstance(content, str) else content)
582
            for path, content in files.items()
583
        ]
584
        digest = self.request(Digest, [CreateDigest(file_contents)])
11✔
585
        return self.request(Snapshot, [digest])
11✔
586

587
    def make_snapshot_of_empty_files(self, files: Iterable[str]) -> Snapshot:
11✔
588
        """Makes a snapshot with empty content for each file.
589

590
        This is a convenience around `TestBase.make_snapshot`, which allows specifying the content
591
        for each file.
592

593
        :API: public
594
        """
595
        return self.make_snapshot(dict.fromkeys(files, ""))
2✔
596

597
    def get_target(self, address: Address) -> Target:
11✔
598
        """Find the target for a given address.
599

600
        This requires that the target actually exists, i.e. that you set up its BUILD file.
601

602
        :API: public
603
        """
604
        return self.request(
11✔
605
            WrappedTarget,
606
            [WrappedTargetRequest(address, description_of_origin="RuleRunner.get_target()")],
607
        ).target
608

609
    def write_digest(
11✔
610
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
611
    ) -> None:
612
        """Write a digest to disk, relative to the test's build root.
613

614
        Access the written files by using `os.path.join(rule_runner.build_root, <relpath>)`.
615
        """
616
        native_engine.write_digest(
8✔
617
            self.scheduler.py_scheduler,
618
            self.scheduler.py_session,
619
            digest,
620
            path_prefix or "",
621
            clear_paths,
622
        )
623

624
    def run_interactive_process(self, request: InteractiveProcess) -> InteractiveProcessResult:
11✔
625
        with self.pushd():
9✔
626
            return native_engine.session_run_interactive_process(
9✔
627
                self.scheduler.py_session,
628
                request,
629
                ProcessExecutionEnvironment(
630
                    environment_name=None,
631
                    platform=Platform.create_for_localhost().value,
632
                    docker_image=None,
633
                    remote_execution=False,
634
                    remote_execution_extra_platform_properties=[],
635
                    execute_in_workspace=False,
636
                    keep_sandboxes="never",
637
                ),
638
            )
639

640
    def do_not_use_mock(self, output_type: type[Any], input_types: Iterable[type[Any]]) -> MockGet:
11✔
641
        """Returns a `MockGet` whose behavior is to run the actual rule using this `RuleRunner`"""
642
        return MockGet(
1✔
643
            output_type=output_type,
644
            input_types=tuple(input_types),
645
            mock=lambda *input_values: self.request(output_type, input_values),
646
        )
647

648

649
# -----------------------------------------------------------------------------------------------
650
# `run_rule_with_mocks()`
651
# -----------------------------------------------------------------------------------------------
652

653

654
@dataclass(frozen=True)
11✔
655
class MockEffect(Generic[_O]):
11✔
656
    output_type: type[_O]
11✔
657
    input_types: tuple[type, ...]
11✔
658
    mock: Callable[..., _O]
11✔
659

660

661
@dataclass(frozen=True)
11✔
662
class MockGet(Generic[_O]):
11✔
663
    output_type: type[_O]
11✔
664
    input_types: tuple[type, ...]
11✔
665
    mock: Callable[..., _O]
11✔
666

667

668
def run_rule_with_mocks(
11✔
669
    rule: Callable[..., Coroutine[Any, Any, _O]],
670
    *,
671
    rule_args: Sequence[Any] = (),
672
    mock_gets: Sequence[MockGet | MockEffect] = (),
673
    mock_calls: Mapping[str, Callable] | None = None,
674
    union_membership: UnionMembership | None = None,
675
    show_warnings: bool = True,
676
) -> _O:
677
    """A test helper that runs an @rule with a set of args and mocked underlying @rule invocations.
678

679
    An @rule named `my_rule` that takes one argument and invokes no other @rules (by-name  or via
680
    `Get` requests) can be invoked like so:
681

682
    ```
683
    return_value = run_rule_with_mocks(my_rule, rule_args=[arg1])
684
    ```
685

686
    In the case of an @rule that invokes other @rules, either by name or via `Get` requests, things
687
    get more interesting: either or both of the `mock_calls` and `mock_gets` arguments must be
688
    provided.
689

690
    - `mock_calls` is a mapping of fully-qualified rule name to the function that mocks that rule,
691
      and mocks out calls by name to the corresponding rules.
692
    - `mock_gets` is a sequence of `MockGet`s and `MockEffect`s. Each MockGet takes the Product and
693
      Subject type, along with a one-argument function that takes a subject value and returns a
694
      product value.
695

696
    So in the case of an @rule named `my_co_rule` that takes one argument and calls the @rule
697
    `path.to.module.list_dir` by name to produce a `Listing` from a `Dir`, the invoke might look
698
    like:
699

700
    ```
701
    return_value = run_rule_with_mocks(
702
      my_co_rule,
703
      rule_args=[arg1],
704
      mock_calls={
705
        "path.to.module.list_dir": lambda dir_subject: Listing(..),
706
      },
707
    )
708
    ```
709

710
    And if that same rule uses a Get request for a product type `Listing` with subject type `Dir`,
711
    the invoke might look like:
712

713
    ```
714
    return_value = run_rule_with_mocks(
715
      my_co_rule,
716
      rule_args=[arg1],
717
      mock_gets=[
718
        MockGet(
719
          output_type=Listing,
720
          input_type=Dir,
721
          mock=lambda dir_subject: Listing(..),
722
        ),
723
      ],
724
    )
725
    ```
726

727
    If any of the @rule's Get requests involve union members, you should pass a `UnionMembership`
728
    mapping the union base to any union members you'd like to test. For example, if your rule has
729
    `await Get(TestResult, TargetAdaptor, target_adaptor)`, you may pass
730
    `UnionMembership({TargetAdaptor: PythonTestsTargetAdaptor})` to this function.
731

732
    :returns: The return value of the completed @rule.
733
    """
734
    mock_calls = mock_calls or {}
11✔
735

736
    task_rule = getattr(rule, "rule", None)
11✔
737

738
    func: Callable[..., Coroutine[Any, Any, _O]] | Callable[..., _O]
739

740
    # Perform additional validation on `@rule` that the correct args are provided. We don't have
741
    # an easy way to do this for async helper calls yet.
742
    if task_rule:
11✔
743
        if len(rule_args) != len(task_rule.parameters):
11✔
744
            raise ValueError(
×
745
                "Error running rule with mocks:\n"
746
                f"Rule {task_rule.func.__qualname__} expected to receive arguments of the "
747
                f"form: {task_rule.parameters}; got: {rule_args}"
748
            )
749

750
        # Access the original function, rather than the trampoline that we would get by calling
751
        # it directly.
752
        func = task_rule.func
11✔
753
    else:
754
        func = rule
3✔
755

756
    res = func(*(rule_args or ()))
11✔
757
    if not isinstance(res, (Coroutine, Generator)):
11✔
758
        return res
×
759

760
    unconsumed_mock_calls = set(mock_calls.keys())
11✔
761
    unconsumed_mock_gets = set(mock_gets)
11✔
762

763
    def get(res: Get | Effect | Call | Coroutine):
11✔
764
        if isinstance(res, Coroutine):
11✔
765
            # A call-by-name element in a concurrently() is a Coroutine whose frame is
766
            # the trampoline wrapper that creates and immediately awaits the Call.
767
            locals = inspect.getcoroutinelocals(res)
5✔
768
            assert locals is not None
5✔
769
            rule_id = locals["rule_id"]
5✔
770
            args = locals["args"]
5✔
771
            kwargs = dict(locals["kwargs"])
5✔
772
            __implicitly = locals.get("__implicitly")
5✔
773
            if __implicitly:
5✔
774
                kwargs["__implicitly"] = __implicitly
4✔
775
            mock_call = mock_calls.get(rule_id)
5✔
776
            if mock_call:
5✔
777
                unconsumed_mock_calls.discard(rule_id)
5✔
778
                # Close the original, unmocked, coroutine, to prevent the "was never awaited"
779
                # warning polluting stderr data that the test may examine.
780
                res.close()
5✔
781
                return mock_call(*args, **kwargs)
5✔
782
            raise AssertionError(f"No mock_call provided for {rule_id}.")
×
783
        elif isinstance(res, Call):
11✔
784
            mock_call = mock_calls.get(res.rule_id)
11✔
785
            if mock_call:
11✔
786
                unconsumed_mock_calls.discard(res.rule_id)
11✔
787
                return mock_call(*res.inputs)
11✔
788
            # For now we fall through, to allow an old-style MockGet to mock a call-by-name, for
789
            # legacy reasons. But we will deprecate and then remove this in the future, at which
790
            # point we should AssertionError error here as well.
791
            # Note that this fallthrough only works for single call-by-names. When wrapped in a
792
            # concurrently() call, mock_calls *must* be used, hence the error above.
793
            if show_warnings:
1✔
794
                # Note that we used `warnings` instead of `logger.warning` because the latter may
795
                # get captured or swallowed by the test framework. These warnings will go away
796
                # once we're fully on call-by-name, anyway.
797
                warnings.warn(
1✔
798
                    f"No mock_call provided for {res.rule_id}, attempting to find a MockGet to "
799
                    "satisfy it. Note that this will soon be deprecated, so we recommend switching "
800
                    "to mock_call ASAP."
801
                )
802

803
        provider = next(
1✔
804
            (
805
                mock_get
806
                for mock_get in mock_gets
807
                if mock_get.output_type == res.output_type
808
                and all(
809
                    # Either the input type is directly provided.
810
                    input_type in mock_get.input_types
811
                    or (
812
                        # Or the input type is a union and the mock has an input whose
813
                        # type is one of the union members.
814
                        union_membership
815
                        and input_type in union_membership
816
                        and any(
817
                            union_membership.is_member(input_type, t) for t in mock_get.input_types
818
                        )
819
                    )
820
                    for input_type in res.input_types
821
                )
822
            ),
823
            None,
824
        )
825
        if provider is None:
1✔
826
            raise AssertionError(f"Rule requested: {res}, which cannot be satisfied.")
×
827
        unconsumed_mock_gets.discard(provider)
1✔
828
        return provider.mock(*res.inputs)
1✔
829

830
    rule_coroutine = res
11✔
831
    rule_input = None
11✔
832

833
    def warn_on_unconsumed_mocks():
11✔
834
        # Note that we used `warnings` instead of `logger.warning` because the latter may
835
        # get captured or swallowed by the test framework.
836
        if show_warnings:
11✔
837
            if unconsumed_mock_calls:
11✔
838
                warnings.warn(f"Unconsumed mock_calls: {unconsumed_mock_calls}")
9✔
839
            if unconsumed_mock_gets:
11✔
840
                warnings.warn(f"Unconsumed mock_gets: {unconsumed_mock_gets}")
1✔
841

842
    while True:
11✔
843
        try:
11✔
844
            res = rule_coroutine.send(rule_input)
11✔
845
            if isinstance(res, (Get, Effect, Call)):
11✔
846
                rule_input = get(res)
11✔
847
            elif type(res) in (tuple, list):
6✔
848
                rule_input = [get(g) for g in res]
6✔
849
            else:
850
                warn_on_unconsumed_mocks()
×
851
                return res  # type: ignore[no-any-return]
×
852
        except StopIteration as e:
11✔
853
            warn_on_unconsumed_mocks()
11✔
854
            return e.value  # type: ignore[no-any-return]
11✔
855

856

857
@contextmanager
11✔
858
def stdin_context(content: bytes | str | None = None):
11✔
859
    if content is None:
11✔
860
        yield open("/dev/null")
11✔
861
    else:
862
        with temporary_file(binary_mode=isinstance(content, bytes)) as stdin_file:
2✔
863
            stdin_file.write(content)
2✔
864
            stdin_file.close()
2✔
865
            yield open(stdin_file.name)
2✔
866

867

868
@contextmanager
11✔
869
def mock_console(
11✔
870
    options_bootstrapper: OptionsBootstrapper,
871
    *,
872
    stdin_content: bytes | str | None = None,
873
) -> Iterator[tuple[Console, StdioReader]]:
874
    with pushd(get_buildroot()):
11✔
875
        global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope()
11✔
876
        colors = (
11✔
877
            options_bootstrapper.full_options_for_scopes(
878
                [GlobalOptions.get_scope_info()],
879
                UnionMembership.empty(),
880
                allow_unknown_options=True,
881
            )
882
            .for_global_scope()
883
            .colors
884
        )
885

886
    with (
11✔
887
        initialize_stdio(global_bootstrap_options),
888
        stdin_context(stdin_content) as stdin,
889
        temporary_file(binary_mode=False) as stdout,
890
        temporary_file(binary_mode=False) as stderr,
891
        stdio_destination(
892
            stdin_fileno=stdin.fileno(),
893
            stdout_fileno=stdout.fileno(),
894
            stderr_fileno=stderr.fileno(),
895
        ),
896
    ):
897
        # NB: We yield a Console without overriding the destination argument, because we have
898
        # already done a sys.std* level replacement. The replacement is necessary in order for
899
        # InteractiveProcess to have native file handles to interact with.
900
        yield (
11✔
901
            Console(use_colors=colors),
902
            StdioReader(_stdout=Path(stdout.name), _stderr=Path(stderr.name)),
903
        )
904

905

906
@dataclass
11✔
907
class StdioReader:
11✔
908
    _stdout: Path
11✔
909
    _stderr: Path
11✔
910

911
    def get_stdout(self) -> str:
11✔
912
        """Return all data that has been flushed to stdout so far."""
913
        return self._stdout.read_text()
10✔
914

915
    def get_stderr(self) -> str:
11✔
916
        """Return all data that has been flushed to stderr so far."""
917
        return self._stderr.read_text()
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc