• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19529437518

20 Nov 2025 07:44AM UTC coverage: 78.884% (-1.4%) from 80.302%
19529437518

push

github

web-flow
nfpm.native_libs: Add RPM package depends from packaged pex_binaries (#22899)

## PR Series Overview

This is the second in a series of PRs that introduces a new backend:
`pants.backend.npm.native_libs`
Initially, the backend will be available as:
`pants.backend.experimental.nfpm.native_libs`

I proposed this new backend (originally named `bindeps`) in discussion
#22396.

This backend will inspect ELF bin/lib files (like `lib*.so`) in packaged
contents (for this PR series, only in `pex_binary` targets) to identify
package dependency metadata and inject that metadata on the relevant
`nfpm_deb_package` or `nfpm_rpm_package` targets. Effectively, it will
provide an approximation of these native packager features:
- `rpm`: `rpmdeps` + `elfdeps`
- `deb`: `dh_shlibdeps` + `dpkg-shlibdeps` (These substitute
`${shlibs:Depends}` in debian control files have)

### Goal: Host-agnostic package builds

This pants backend is designed to be host-agnostic, like
[nFPM](https://nfpm.goreleaser.com/).

Native packaging tools are often restricted to a single release of a
single distro. Unlike native package builders, this new pants backend
does not use any of those distro-specific or distro-release-specific
utilities or local package databases. This new backend should be able to
run (help with building deb and rpm packages) anywhere that pants can
run (MacOS, rpm linux distros, deb linux distros, other linux distros,
docker, ...).

### Previous PRs in series

- #22873

## PR Overview

This PR adds rules in `nfpm.native_libs` to add package dependency
metadata to `nfpm_rpm_package`. The 2 new rules are:

- `inject_native_libs_dependencies_in_package_fields`:

    - An implementation of the polymorphic rule `inject_nfpm_package_fields`.
      This rule is low priority (`priority = 2`) so that in-repo plugins can
      override/augment what it injects. (See #22864)

    - Rule logic overview:
        - find any pex_binaries that will be packaged in an `nfpm_rpm_package`
   ... (continued)

96 of 118 new or added lines in 3 files covered. (81.36%)

910 existing lines in 53 files now uncovered.

73897 of 93678 relevant lines covered (78.88%)

3.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.62
/src/python/pants/engine/internals/scheduler.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
11✔
5

6
import logging
11✔
7
import os
11✔
8
import time
11✔
9
from collections import defaultdict
11✔
10
from collections.abc import Callable, Iterable, Sequence
11✔
11
from dataclasses import dataclass
11✔
12
from pathlib import PurePath
11✔
13
from types import CoroutineType
11✔
14
from typing import Any, NoReturn, cast
11✔
15

16
from typing_extensions import TypedDict
11✔
17

18
from pants.engine.collection import Collection
11✔
19
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType, SideEffecting
11✔
20
from pants.engine.fs import (
11✔
21
    CreateDigest,
22
    Digest,
23
    DigestContents,
24
    DigestEntries,
25
    DigestSubset,
26
    Directory,
27
    FileContent,
28
    FileDigest,
29
    FileEntry,
30
    NativeDownloadFile,
31
    PathGlobs,
32
    PathGlobsAndRoot,
33
    PathMetadataRequest,
34
    PathMetadataResult,
35
    Paths,
36
    Snapshot,
37
    SymlinkEntry,
38
)
39
from pants.engine.goal import CurrentExecutingGoals, Goal
11✔
40
from pants.engine.internals import native_engine
11✔
41
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
11✔
42
from pants.engine.internals.native_dep_inference import (
11✔
43
    NativeParsedDockerfileInfo,
44
    NativeParsedJavascriptDependencies,
45
    NativeParsedPythonDependencies,
46
    ParsedJavascriptDependencyCandidate,
47
)
48
from pants.engine.internals.native_engine import (
11✔
49
    PyExecutionRequest,
50
    PyExecutionStrategyOptions,
51
    PyExecutor,
52
    PyLocalStoreOptions,
53
    PyRemotingOptions,
54
    PyScheduler,
55
    PySession,
56
    PySessionCancellationLatch,
57
    PyTasks,
58
    PyTypes,
59
)
60
from pants.engine.internals.nodes import Return, Throw
11✔
61
from pants.engine.internals.selectors import Params
11✔
62
from pants.engine.internals.session import RunId, SessionValues
11✔
63
from pants.engine.platform import Platform
11✔
64
from pants.engine.process import (
11✔
65
    FallibleProcessResult,
66
    InteractiveProcess,
67
    InteractiveProcessResult,
68
    Process,
69
    ProcessResultMetadata,
70
)
71
from pants.engine.rules import Rule, RuleIndex, TaskRule
11✔
72
from pants.engine.unions import UnionMembership, is_union, union_in_scope_types
11✔
73
from pants.option.bootstrap_options import (
11✔
74
    LOCAL_STORE_LEASE_TIME_SECS,
75
    ExecutionOptions,
76
    LocalStoreOptions,
77
)
78
from pants.util.contextutil import temporary_file_path
11✔
79
from pants.util.logging import LogLevel
11✔
80
from pants.util.strutil import pluralize
11✔
81

82
logger = logging.getLogger(__name__)
11✔
83

84

85
Workunit = dict[str, Any]
11✔
86

87

88
class PolledWorkunits(TypedDict):
11✔
89
    started: tuple[Workunit, ...]
11✔
90
    completed: tuple[Workunit, ...]
11✔
91

92

93
@dataclass(frozen=True)
11✔
94
class ExecutionRequest:
11✔
95
    """Holds the roots for an execution, which might have been requested by a user.
96

97
    To create an ExecutionRequest, see `SchedulerSession.execution_request`.
98
    """
99

100
    native: PyExecutionRequest
11✔
101

102

103
class ExecutionError(Exception):
11✔
104
    def __init__(self, message, wrapped_exceptions=None):
11✔
105
        super().__init__(message)
11✔
106
        self.wrapped_exceptions = wrapped_exceptions or ()
11✔
107

108

109
class ExecutionTimeoutError(ExecutionError):
11✔
110
    """An ExecutionRequest specified a timeout which elapsed before the request completed."""
111

112

113
class Scheduler:
11✔
114
    def __init__(
11✔
115
        self,
116
        *,
117
        ignore_patterns: list[str],
118
        use_gitignore: bool,
119
        build_root: str,
120
        pants_workdir: str,
121
        local_execution_root_dir: str,
122
        named_caches_dir: str,
123
        ca_certs_path: str | None,
124
        rules: Iterable[Rule],
125
        union_membership: UnionMembership,
126
        execution_options: ExecutionOptions,
127
        local_store_options: LocalStoreOptions,
128
        executor: PyExecutor,
129
        include_trace_on_error: bool = True,
130
        visualize_to_dir: str | None = None,
131
        validate_reachability: bool = True,
132
        watch_filesystem: bool = True,
133
    ) -> None:
134
        """
135
        :param ignore_patterns: A list of gitignore-style file patterns for pants to ignore.
136
        :param use_gitignore: If set, pay attention to .gitignore files.
137
        :param build_root: The build root as a string.
138
        :param pants_workdir: The pants workdir as a string.
139
        :param local_execution_root_dir: The directory to use for local execution sandboxes.
140
        :param named_caches_dir: The directory to use as the root for named mutable caches.
141
        :param ca_certs_path: Path to pem file for custom CA, if needed.
142
        :param rules: A set of Rules which is used to compute values in the graph.
143
        :param union_membership: All the registered and normalized union rules.
144
        :param execution_options: Execution options for (remote) processes.
145
        :param local_store_options: Options for the engine's LMDB store(s).
146
        :param include_trace_on_error: Include the trace through the graph upon encountering errors.
147
        :param validate_reachability: True to assert that all rules in an otherwise successfully
148
          constructed rule graph are reachable: if a graph cannot be successfully constructed, it
149
          is always a fatal error.
150
        :param watch_filesystem: False if filesystem watching should be disabled.
151
        """
152
        self.include_trace_on_error = include_trace_on_error
11✔
153
        self._visualize_to_dir = visualize_to_dir
11✔
154
        self._visualize_run_count = 0
11✔
155
        # Validate and register all provided and intrinsic tasks.
156
        rule_index = RuleIndex.create(rules)
11✔
157
        tasks = register_rules(rule_index, union_membership)
11✔
158

159
        # Create the native Scheduler and Session.
160
        types = PyTypes(
11✔
161
            paths=Paths,
162
            path_metadata_request=PathMetadataRequest,
163
            path_metadata_result=PathMetadataResult,
164
            file_content=FileContent,
165
            file_entry=FileEntry,
166
            symlink_entry=SymlinkEntry,
167
            directory=Directory,
168
            digest_contents=DigestContents,
169
            digest_entries=DigestEntries,
170
            path_globs=PathGlobs,
171
            create_digest=CreateDigest,
172
            digest_subset=DigestSubset,
173
            native_download_file=NativeDownloadFile,
174
            platform=Platform,
175
            process=Process,
176
            process_result=FallibleProcessResult,
177
            process_result_metadata=ProcessResultMetadata,
178
            coroutine=CoroutineType,
179
            session_values=SessionValues,
180
            run_id=RunId,
181
            interactive_process=InteractiveProcess,
182
            interactive_process_result=InteractiveProcessResult,
183
            engine_aware_parameter=EngineAwareParameter,
184
            docker_resolve_image_request=DockerResolveImageRequest,
185
            docker_resolve_image_result=DockerResolveImageResult,
186
            parsed_python_deps_result=NativeParsedPythonDependencies,
187
            parsed_javascript_deps_result=NativeParsedJavascriptDependencies,
188
            parsed_dockerfile_info_result=NativeParsedDockerfileInfo,
189
            parsed_javascript_deps_candidate_result=ParsedJavascriptDependencyCandidate,
190
        )
191
        remoting_options = PyRemotingOptions(
11✔
192
            provider=execution_options.remote_provider.value,
193
            execution_enable=execution_options.remote_execution,
194
            store_headers=execution_options.remote_store_headers,
195
            store_chunk_bytes=execution_options.remote_store_chunk_bytes,
196
            store_rpc_retries=execution_options.remote_store_rpc_retries,
197
            store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
198
            store_rpc_timeout_millis=execution_options.remote_store_rpc_timeout_millis,
199
            store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
200
            store_batch_load_enabled=execution_options.remote_store_batch_load_enabled,
201
            cache_warnings_behavior=execution_options.remote_cache_warnings.value,
202
            cache_content_behavior=execution_options.cache_content_behavior.value,
203
            cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
204
            cache_rpc_timeout_millis=execution_options.remote_cache_rpc_timeout_millis,
205
            execution_headers=execution_options.remote_execution_headers,
206
            execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs,
207
            execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency,
208
            store_address=execution_options.remote_store_address,
209
            execution_address=execution_options.remote_execution_address,
210
            execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
211
            instance_name=execution_options.remote_instance_name,
212
            root_ca_certs_path=execution_options.remote_ca_certs_path,
213
            client_certs_path=execution_options.remote_client_certs_path,
214
            client_key_path=execution_options.remote_client_key_path,
215
            append_only_caches_base_path=execution_options.remote_execution_append_only_caches_base_path,
216
        )
217
        py_local_store_options = PyLocalStoreOptions(
11✔
218
            store_dir=local_store_options.store_dir,
219
            process_cache_max_size_bytes=local_store_options.processes_max_size_bytes,
220
            files_max_size_bytes=local_store_options.files_max_size_bytes,
221
            directories_max_size_bytes=local_store_options.directories_max_size_bytes,
222
            lease_time_millis=LOCAL_STORE_LEASE_TIME_SECS * 1000,
223
            shard_count=local_store_options.shard_count,
224
        )
225
        exec_strategy_opts = PyExecutionStrategyOptions(
11✔
226
            local_cache=execution_options.local_cache,
227
            remote_cache_read=execution_options.remote_cache_read,
228
            remote_cache_write=execution_options.remote_cache_write,
229
            use_sandboxer=execution_options.use_sandboxer,
230
            local_parallelism=execution_options.process_execution_local_parallelism,
231
            local_enable_nailgun=execution_options.process_execution_local_enable_nailgun,
232
            remote_parallelism=execution_options.process_execution_remote_parallelism,
233
            child_max_memory=execution_options.process_total_child_memory_usage or 0,
234
            child_default_memory=execution_options.process_per_child_memory_usage,
235
            graceful_shutdown_timeout=execution_options.process_execution_graceful_shutdown_timeout,
236
        )
237

238
        self._py_executor = executor
11✔
239
        self._py_scheduler = native_engine.scheduler_create(
11✔
240
            executor,
241
            tasks,
242
            types,
243
            build_root,
244
            pants_workdir,
245
            local_execution_root_dir,
246
            named_caches_dir,
247
            ignore_patterns,
248
            use_gitignore,
249
            watch_filesystem,
250
            remoting_options,
251
            py_local_store_options,
252
            exec_strategy_opts,
253
            ca_certs_path,
254
        )
255

256
        # If configured, visualize the rule graph before asserting that it is valid.
257
        if self._visualize_to_dir is not None:
11✔
258
            rule_graph_name = "rule_graph.dot"
×
259
            self.visualize_rule_graph_to_file(os.path.join(self._visualize_to_dir, rule_graph_name))
×
260

261
        if validate_reachability:
11✔
262
            native_engine.validate_reachability(self.py_scheduler)
11✔
263

264
    @property
11✔
265
    def py_scheduler(self) -> PyScheduler:
11✔
266
        return self._py_scheduler
11✔
267

268
    @property
11✔
269
    def py_executor(self) -> PyExecutor:
11✔
270
        return self._py_executor
×
271

272
    def _to_params_list(self, subject_or_params: Any | Params) -> Sequence[Any]:
11✔
273
        if isinstance(subject_or_params, Params):
11✔
274
            return subject_or_params.params
11✔
275
        return [subject_or_params]
1✔
276

277
    def visualize_rule_graph_to_file(self, filename: str) -> None:
11✔
UNCOV
278
        native_engine.rule_graph_visualize(self.py_scheduler, filename)
×
279

280
    def visualize_rule_subgraph_to_file(
11✔
281
        self, filename: str, root_subject_types: list[type], product_type: type
282
    ) -> None:
UNCOV
283
        native_engine.rule_subgraph_visualize(
×
284
            self.py_scheduler, root_subject_types, product_type, filename
285
        )
286

287
    def rule_graph_visualization(self):
11✔
UNCOV
288
        with temporary_file_path() as path:
×
UNCOV
289
            self.visualize_rule_graph_to_file(path)
×
UNCOV
290
            with open(path) as fd:
×
UNCOV
291
                for line in fd.readlines():
×
UNCOV
292
                    yield line.rstrip()
×
293

294
    def rule_subgraph_visualization(self, root_subject_types: list[type], product_type: type):
11✔
UNCOV
295
        with temporary_file_path() as path:
×
UNCOV
296
            self.visualize_rule_subgraph_to_file(path, root_subject_types, product_type)
×
UNCOV
297
            with open(path) as fd:
×
UNCOV
298
                for line in fd.readlines():
×
UNCOV
299
                    yield line.rstrip()
×
300

301
    def rule_graph_consumed_types(
11✔
302
        self, root_subject_types: Sequence[type], product_type: type
303
    ) -> Sequence[type]:
304
        return native_engine.rule_graph_consumed_types(
1✔
305
            self.py_scheduler, root_subject_types, product_type
306
        )
307

308
    def invalidate_files(self, filenames: Iterable[str]) -> int:
11✔
309
        return native_engine.graph_invalidate_paths(self.py_scheduler, filenames)
11✔
310

311
    def invalidate_all_files(self) -> int:
11✔
312
        return native_engine.graph_invalidate_all_paths(self.py_scheduler)
×
313

314
    def invalidate_all(self) -> None:
11✔
315
        native_engine.graph_invalidate_all(self.py_scheduler)
1✔
316

317
    def check_invalidation_watcher_liveness(self) -> None:
11✔
318
        native_engine.check_invalidation_watcher_liveness(self.py_scheduler)
×
319

320
    def graph_len(self) -> int:
11✔
321
        return native_engine.graph_len(self.py_scheduler)
11✔
322

323
    def execution_add_root_select(
11✔
324
        self, execution_request: PyExecutionRequest, subject_or_params: Any | Params, product: type
325
    ) -> None:
326
        params = self._to_params_list(subject_or_params)
11✔
327
        native_engine.execution_add_root_select(
11✔
328
            self.py_scheduler, execution_request, params, product
329
        )
330

331
    @property
11✔
332
    def visualize_to_dir(self) -> str | None:
11✔
333
        return self._visualize_to_dir
11✔
334

335
    def garbage_collect_store(self, target_size_bytes: int) -> None:
11✔
UNCOV
336
        native_engine.garbage_collect_store(self.py_scheduler, target_size_bytes)
×
337

338
    def new_session(
11✔
339
        self,
340
        build_id: str,
341
        dynamic_ui: bool = False,
342
        ui_use_prodash: bool = False,
343
        max_workunit_level: LogLevel = LogLevel.DEBUG,
344
        session_values: SessionValues | None = None,
345
        cancellation_latch: PySessionCancellationLatch | None = None,
346
    ) -> SchedulerSession:
347
        """Creates a new SchedulerSession for this Scheduler."""
348
        return SchedulerSession(
11✔
349
            self,
350
            PySession(
351
                scheduler=self.py_scheduler,
352
                dynamic_ui=dynamic_ui,
353
                ui_use_prodash=ui_use_prodash,
354
                max_workunit_level=max_workunit_level.level,
355
                build_id=build_id,
356
                session_values=session_values or SessionValues(),
357
                cancellation_latch=cancellation_latch or PySessionCancellationLatch(),
358
            ),
359
        )
360

361
    def shutdown(self, timeout_secs: int = 60) -> None:
11✔
362
        native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs)
×
363

364

365
class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]):
11✔
366
    pass
11✔
367

368

369
class SchedulerSession:
11✔
370
    """A handle to a shared underlying Scheduler and a unique Session.
371

372
    Generally a Session corresponds to a single run of pants: some metrics are specific to a
373
    Session.
374
    """
375

376
    def __init__(self, scheduler: Scheduler, session: PySession) -> None:
11✔
377
        self._scheduler = scheduler
11✔
378
        self._py_session = session
11✔
379
        self._goals = session.session_values.get(CurrentExecutingGoals) or CurrentExecutingGoals()
11✔
380

381
    @property
11✔
382
    def scheduler(self) -> Scheduler:
11✔
383
        return self._scheduler
11✔
384

385
    @property
11✔
386
    def py_scheduler(self) -> PyScheduler:
11✔
387
        return self._scheduler.py_scheduler
11✔
388

389
    @property
11✔
390
    def py_session(self) -> PySession:
11✔
391
        return self._py_session
11✔
392

393
    def isolated_shallow_clone(self, build_id: str) -> SchedulerSession:
11✔
394
        return SchedulerSession(
1✔
395
            self._scheduler,
396
            native_engine.session_isolated_shallow_clone(self._py_session, build_id),
397
        )
398

399
    def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
11✔
400
        result = native_engine.session_poll_workunits(
1✔
401
            self.py_scheduler, self.py_session, max_log_verbosity.level
402
        )
403
        return {"started": result[0], "completed": result[1]}
1✔
404

405
    def new_run_id(self) -> None:
11✔
406
        """Assigns a new "run id" to this Session, without creating a new Session.
407

408
        Usually each Session corresponds to one end user "run", but there are exceptions: notably,
409
        the `--loop` feature uses one Session, but would like to observe new values for uncacheable
410
        nodes in each iteration of its loop.
411
        """
UNCOV
412
        native_engine.session_new_run_id(self.py_session)
×
413

414
    def visualize_graph_to_file(self, filename: str) -> None:
11✔
415
        """Visualize a graph walk by writing graphviz `dot` output to a file."""
416
        native_engine.graph_visualize(self.py_scheduler, self.py_session, filename)
×
417

418
    def visualize_rule_graph_to_file(self, filename: str) -> None:
11✔
419
        self._scheduler.visualize_rule_graph_to_file(filename)
×
420

421
    def rule_graph_rule_gets(self) -> dict[Callable, list[tuple[type, list[type], Callable]]]:
11✔
422
        return native_engine.rule_graph_rule_gets(self.py_scheduler)
×
423

424
    def execution_request(
11✔
425
        self,
426
        requests: Sequence[tuple[type, Any | Params]],
427
        poll: bool = False,
428
        poll_delay: float | None = None,
429
        timeout: float | None = None,
430
    ) -> ExecutionRequest:
431
        """Create and return an ExecutionRequest for the given (product, subject) pairs.
432

433
        :param requests: A sequence of product types to request for subjects.
434
        :param poll: True to wait for _all_ of the given roots to
435
          have changed since their last observed values in this SchedulerSession.
436
        :param poll_delay: A delay (in seconds) to wait after observing a change, and before
437
          beginning to compute a new value.
438
        :param timeout: An optional timeout to wait for the request to complete (in seconds). If the
439
          request has not completed before the timeout has elapsed, ExecutionTimeoutError is raised.
440
        :returns: An ExecutionRequest for the given products and subjects.
441
        """
442
        native_execution_request = PyExecutionRequest(
11✔
443
            poll=poll,
444
            poll_delay_in_ms=int(poll_delay * 1000) if poll_delay else None,
445
            timeout_in_ms=int(timeout * 1000) if timeout else None,
446
        )
447
        for product, subject in requests:
11✔
448
            self._scheduler.execution_add_root_select(native_execution_request, subject, product)
11✔
449
        return ExecutionRequest(native_execution_request)
11✔
450

451
    def invalidate_files(self, direct_filenames: Iterable[str]) -> int:
11✔
452
        """Invalidates the given filenames in an internal product Graph instance."""
453
        invalidated = self._scheduler.invalidate_files(direct_filenames)
11✔
454
        self._maybe_visualize()
11✔
455
        return invalidated
11✔
456

457
    def invalidate_all_files(self) -> int:
11✔
458
        """Invalidates all filenames in an internal product Graph instance."""
459
        invalidated = self._scheduler.invalidate_all_files()
×
460
        self._maybe_visualize()
×
461
        return invalidated
×
462

463
    def metrics(self) -> dict[str, int]:
11✔
464
        """Returns metrics for this SchedulerSession as a dict of metric name to metric value."""
465
        return native_engine.scheduler_metrics(self.py_scheduler, self.py_session)
×
466

467
    def live_items(self) -> tuple[list[Any], dict[str, tuple[int, int]]]:
11✔
468
        """Return all Python objects held by the Scheduler."""
469
        return native_engine.scheduler_live_items(self.py_scheduler, self.py_session)
×
470

471
    def _maybe_visualize(self) -> None:
11✔
472
        if self._scheduler.visualize_to_dir is not None:
11✔
473
            # TODO: This increment-and-get is racey.
474
            name = f"graph.{self._scheduler._visualize_run_count:03d}.dot"
×
475
            self._scheduler._visualize_run_count += 1
×
476
            logger.info(f"Visualizing graph as {name}")
×
477
            self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name))
×
478

479
    def teardown_dynamic_ui(self) -> None:
11✔
480
        native_engine.teardown_dynamic_ui(self.py_scheduler, self.py_session)
×
481

482
    def _execute(
11✔
483
        self, execution_request: ExecutionRequest
484
    ) -> tuple[tuple[Return, ...], tuple[Throw, ...]]:
485
        start_time = time.time()
11✔
486
        try:
11✔
487
            raw_roots = native_engine.scheduler_execute(
11✔
488
                self.py_scheduler,
489
                self.py_session,
490
                execution_request.native,
491
            )
492
        except native_engine.PollTimeout:
×
493
            raise ExecutionTimeoutError("Timed out")
×
494

495
        states = [
11✔
496
            (
497
                Throw(
498
                    raw_root.result,
499
                    python_traceback=raw_root.python_traceback,
500
                    engine_traceback=raw_root.engine_traceback,
501
                )
502
                if raw_root.is_throw
503
                else Return(raw_root.result)
504
            )
505
            for raw_root in raw_roots
506
        ]
507

508
        self._maybe_visualize()
11✔
509
        logger.debug(
11✔
510
            "computed %s nodes in %f seconds. there are %s total nodes.",
511
            len(states),
512
            time.time() - start_time,
513
            self._scheduler.graph_len(),
514
        )
515

516
        returns = tuple(state for state in states if isinstance(state, Return))
11✔
517
        throws = tuple(state for state in states if isinstance(state, Throw))
11✔
518
        return returns, throws
11✔
519

520
    def _raise_on_error(self, throws: Sequence[Throw]) -> NoReturn:
11✔
521
        exception_noun = pluralize(len(throws), "Exception")
11✔
522
        others_msg = f"\n(and {len(throws) - 1} more)\n" if len(throws) > 1 else ""
11✔
523
        raise ExecutionError(
11✔
524
            f"{exception_noun} encountered:\n\n"
525
            f"{throws[0].render(self._scheduler.include_trace_on_error)}\n"
526
            f"{others_msg}",
527
            wrapped_exceptions=tuple(t.exc for t in throws),
528
        )
529

530
    def execute(self, execution_request: ExecutionRequest) -> list[Any]:
11✔
531
        """Invoke the engine for the given ExecutionRequest, returning successful values or raising.
532

533
        :return: A sequence of per-request results.
534
        """
535
        returns, throws = self._execute(execution_request)
11✔
536

537
        # Throw handling.
538
        if throws:
11✔
539
            self._raise_on_error(throws)
11✔
540

541
        # Everything is a Return: we rely on the fact that roots are ordered to preserve subject
542
        # order in output lists.
543
        return [ret.value for ret in returns]
11✔
544

545
    def run_goal_rule(
11✔
546
        self,
547
        product: type[Goal],
548
        subject: Params,
549
        *,
550
        poll: bool = False,
551
        poll_delay: float | None = None,
552
    ) -> int:
553
        """
554
        :param product: A Goal subtype.
555
        :param subject: subject for the request.
556
        :param poll: See self.execution_request.
557
        :param poll_delay: See self.execution_request.
558
        :returns: An exit_code for the given Goal.
559
        """
560
        if self._scheduler.visualize_to_dir is not None:
11✔
561
            rule_graph_name = f"rule_graph.{product.name}.dot"
×
562
            params = self._scheduler._to_params_list(subject)
×
563
            self._scheduler.visualize_rule_subgraph_to_file(
×
564
                os.path.join(self._scheduler.visualize_to_dir, rule_graph_name),
565
                [type(p) for p in params],
566
                product,
567
            )
568
        with self._goals._execute(product):
11✔
569
            (return_value,) = self.product_request(
11✔
570
                product, subject, poll=poll, poll_delay=poll_delay
571
            )
572
        return cast(int, return_value.exit_code)
11✔
573

574
    def product_request(
11✔
575
        self,
576
        product: type,
577
        subject: Any | Params,
578
        *,
579
        poll: bool = False,
580
        poll_delay: float | None = None,
581
        timeout: float | None = None,
582
    ) -> list:
583
        """Executes a request for a single product for a subject, and returns the products.
584

585
        :param product: A product type for the request.
586
        :param subject: A subject or Params instance for the request.
587
        :param poll: See self.execution_request.
588
        :param poll_delay: See self.execution_request.
589
        :param timeout: See self.execution_request.
590
        :returns: A list of the requested products, with length match len(subjects).
591
        """
592
        request = self.execution_request(
11✔
593
            [(product, subject)],
594
            poll=poll,
595
            poll_delay=poll_delay,
596
            timeout=timeout,
597
        )
598
        return self.execute(request)
11✔
599

600
    def capture_snapshots(self, path_globs_and_roots: Iterable[PathGlobsAndRoot]) -> list[Snapshot]:
11✔
601
        """Synchronously captures Snapshots for each matching PathGlobs rooted at a its root
602
        directory.
603

604
        This is a blocking operation, and should be avoided where possible.
605
        """
606
        return native_engine.capture_snapshots(
1✔
607
            self.py_scheduler,
608
            self.py_session,
609
            _PathGlobsAndRootCollection(path_globs_and_roots),
610
        )
611

612
    def single_file_digests_to_bytes(self, digests: Sequence[FileDigest]) -> list[bytes]:
11✔
613
        return native_engine.single_file_digests_to_bytes(self.py_scheduler, list(digests))
1✔
614

615
    def snapshots_to_file_contents(
11✔
616
        self, snapshots: Sequence[Snapshot]
617
    ) -> tuple[DigestContents, ...]:
618
        """For each input `Snapshot`, yield a single `DigestContents` containing all the
619
        `FileContent`s corresponding to the file(s) contained within that `Snapshot`.
620

621
        Note that we cannot currently use a parallelized version of `self.product_request` since
622
        each snapshot needs to yield a separate `DigestContents`.
623
        """
624
        return tuple(
1✔
625
            self.product_request(DigestContents, snapshot.digest)[0] for snapshot in snapshots
626
        )
627

628
    def ensure_remote_has_recursive(self, digests: Sequence[Digest | FileDigest]) -> None:
11✔
629
        native_engine.ensure_remote_has_recursive(self.py_scheduler, list(digests))
1✔
630

631
    def ensure_directory_digest_persisted(self, digest: Digest) -> None:
11✔
632
        native_engine.ensure_directory_digest_persisted(self.py_scheduler, digest)
1✔
633

634
    def write_digest(
11✔
635
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
636
    ) -> None:
637
        """Write a digest to disk, relative to the build root."""
638
        if path_prefix and PurePath(path_prefix).is_absolute():
5✔
639
            raise ValueError(
×
640
                f"The `path_prefix` {path_prefix} must be a relative path, as the engine writes "
641
                "the digest relative to the build root."
642
            )
643
        native_engine.write_digest(
5✔
644
            self.py_scheduler, self.py_session, digest, path_prefix or "", clear_paths
645
        )
646

647
    def lease_files_in_graph(self) -> None:
11✔
UNCOV
648
        native_engine.lease_files_in_graph(self.py_scheduler, self.py_session)
×
649

650
    def garbage_collect_store(self, target_size_bytes: int) -> None:
11✔
UNCOV
651
        self._scheduler.garbage_collect_store(target_size_bytes)
×
652

653
    def get_metrics(self) -> dict[str, int]:
11✔
654
        return native_engine.session_get_metrics(self.py_session)
1✔
655

656
    def get_observation_histograms(self) -> dict[str, Any]:
11✔
657
        return native_engine.session_get_observation_histograms(self.py_scheduler, self.py_session)
1✔
658

659
    def record_test_observation(self, value: int) -> None:
11✔
660
        native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value)
1✔
661

662
    @property
11✔
663
    def is_cancelled(self) -> bool:
11✔
664
        return self.py_session.is_cancelled()
×
665

666
    def cancel(self) -> None:
11✔
UNCOV
667
        self.py_session.cancel()
×
668

669
    def wait_for_tail_tasks(self, timeout: float) -> None:
11✔
670
        native_engine.session_wait_for_tail_tasks(self.py_scheduler, self.py_session, timeout)
×
671

672

673
def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks:
11✔
674
    """Create a native Tasks object loaded with given RuleIndex."""
675
    tasks = PyTasks()
11✔
676

677
    # Compute a reverse index of union membership.
678
    member_type_to_base_types = defaultdict(list)
11✔
679
    for base_type, member_types in union_membership.items():
11✔
680
        for member_type in member_types:
11✔
681
            member_type_to_base_types[member_type].append(base_type)
11✔
682

683
    # Compute map from union base type to rules that have one of its member types as a param.
684
    # The value is a list of pairs (rule, member type in that rule's params).
685
    base_type_to_member_rule_type_pairs: dict[type, list[tuple[TaskRule, type[Any]]]] = defaultdict(
11✔
686
        list
687
    )
688
    rule_id_to_rule: dict[str, TaskRule] = {}
11✔
689
    for task_rule in rule_index.rules:
11✔
690
        rule_id_to_rule[task_rule.canonical_name] = task_rule
11✔
691
        for param_type in task_rule.parameters.values():
11✔
692
            for base_type in member_type_to_base_types.get(param_type, tuple()):
11✔
693
                base_type_to_member_rule_type_pairs[base_type].append((task_rule, param_type))
11✔
694

695
    def register_task(rule: TaskRule) -> None:
11✔
696
        native_engine.tasks_task_begin(
11✔
697
            tasks,
698
            rule.func,
699
            rule.output_type,
700
            tuple(rule.parameters.items()),
701
            rule.masked_types,
702
            side_effecting=any(issubclass(t, SideEffecting) for t in rule.parameters.values()),
703
            engine_aware_return_type=issubclass(rule.output_type, EngineAwareReturnType),
704
            cacheable=rule.cacheable,
705
            name=rule.canonical_name,
706
            desc=rule.desc or "",
707
            level=rule.level.level,
708
        )
709

710
        for awaitable in rule.awaitables:
11✔
711
            unions = [t for t in awaitable.input_types if is_union(t)]
11✔
712
            if len(unions) == 1:
11✔
713
                union = unions[0]
11✔
714
                if awaitable.rule_id:
11✔
715
                    if rule_id_to_rule[awaitable.rule_id].polymorphic:
11✔
716
                        # This is a polymorphic call-by-name. Compute its vtable data, i.e., a list
717
                        # of pairs (union member type, id of implementation rule for that type).
718
                        member_rule_type_pairs = base_type_to_member_rule_type_pairs.get(union)
11✔
719
                        vtable_entries: list[tuple[type[Any], str]] = []
11✔
720
                        for member_rule, member_type in member_rule_type_pairs or []:
11✔
721
                            # If a rule has a union member as a param, and returns the relevant
722
                            # output type, then we take it to be the implementation of the
723
                            # polymorphic rule for the union member.
724
                            if member_rule.output_type == awaitable.output_type:
11✔
725
                                vtable_entries.append((member_type, member_rule.canonical_name))
11✔
726
                        in_scope_types = union_in_scope_types(union)
11✔
727
                        assert in_scope_types is not None
11✔
728
                        native_engine.tasks_add_call(
11✔
729
                            tasks,
730
                            awaitable.output_type,
731
                            awaitable.input_types,
732
                            awaitable.rule_id,
733
                            awaitable.explicit_args_arity,
734
                            vtable_entries,
735
                            in_scope_types,
736
                        )
737
                else:
738
                    # This is a union Get.
739
                    # Register the union by recording a copy of the Get for each union member.
740
                    in_scope_types = union_in_scope_types(union)
×
741
                    assert in_scope_types is not None
×
742
                    for union_member in union_membership.get(union):
×
743
                        native_engine.tasks_add_get_union(
×
744
                            tasks,
745
                            awaitable.output_type,
746
                            tuple(union_member if t == union else t for t in awaitable.input_types),
747
                            in_scope_types,
748
                        )
749
            elif len(unions) > 1:
11✔
750
                raise TypeError(
×
751
                    f"Only one @union may be used in a call or Get, but {awaitable} used: {unions}."
752
                )
753
            elif awaitable.rule_id is not None:
11✔
754
                # Is a non-polymorphic call to a known rule.
755
                native_engine.tasks_add_call(
11✔
756
                    tasks,
757
                    awaitable.output_type,
758
                    awaitable.input_types,
759
                    awaitable.rule_id,
760
                    awaitable.explicit_args_arity,
761
                    vtable_entries=None,
762
                    in_scope_types=None,
763
                )
764
            else:
765
                # Otherwise, the Get subject is a "concrete" type, so add a single Get edge.
766
                native_engine.tasks_add_get(tasks, awaitable.output_type, awaitable.input_types)
×
767

768
        native_engine.tasks_task_end(tasks)
11✔
769

770
    for task_rule in rule_index.rules:
11✔
771
        register_task(task_rule)
11✔
772
    for query in rule_index.queries:
11✔
773
        native_engine.tasks_add_query(
11✔
774
            tasks,
775
            query.output_type,
776
            query.input_types,
777
        )
778
    return tasks
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc