• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20333307239

18 Dec 2025 10:07AM UTC coverage: 75.452% (-4.8%) from 80.295%
20333307239

Pull #22949

github

web-flow
Merge b07232683 into 407284c67
Pull Request #22949: Add experimental uv resolver for Python lockfiles

51 of 96 new or added lines in 5 files covered. (53.13%)

2857 existing lines in 120 files now uncovered.

66315 of 87890 relevant lines covered (75.45%)

2.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.4
/src/python/pants/engine/internals/scheduler.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
9✔
5

6
import logging
9✔
7
import os
9✔
8
import time
9✔
9
from collections import defaultdict
9✔
10
from collections.abc import Callable, Iterable, Sequence
9✔
11
from dataclasses import dataclass
9✔
12
from pathlib import PurePath
9✔
13
from types import CoroutineType
9✔
14
from typing import Any, NoReturn, cast
9✔
15

16
from typing_extensions import TypedDict
9✔
17

18
from pants.engine.collection import Collection
9✔
19
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType, SideEffecting
9✔
20
from pants.engine.fs import (
9✔
21
    CreateDigest,
22
    Digest,
23
    DigestContents,
24
    DigestEntries,
25
    DigestSubset,
26
    Directory,
27
    FileContent,
28
    FileDigest,
29
    FileEntry,
30
    NativeDownloadFile,
31
    PathGlobs,
32
    PathGlobsAndRoot,
33
    PathMetadataRequest,
34
    PathMetadataResult,
35
    Paths,
36
    Snapshot,
37
    SymlinkEntry,
38
)
39
from pants.engine.goal import CurrentExecutingGoals, Goal
9✔
40
from pants.engine.internals import native_engine
9✔
41
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
9✔
42
from pants.engine.internals.native_dep_inference import (
9✔
43
    NativeParsedDockerfileInfo,
44
    NativeParsedJavascriptDependencies,
45
    NativeParsedPythonDependencies,
46
    ParsedJavascriptDependencyCandidate,
47
)
48
from pants.engine.internals.native_engine import (
9✔
49
    PyExecutionRequest,
50
    PyExecutionStrategyOptions,
51
    PyExecutor,
52
    PyLocalStoreOptions,
53
    PyRemotingOptions,
54
    PyScheduler,
55
    PySession,
56
    PySessionCancellationLatch,
57
    PyTasks,
58
    PyTypes,
59
)
60
from pants.engine.internals.nodes import Return, Throw
9✔
61
from pants.engine.internals.selectors import Params
9✔
62
from pants.engine.internals.session import RunId, SessionValues
9✔
63
from pants.engine.platform import Platform
9✔
64
from pants.engine.process import (
9✔
65
    FallibleProcessResult,
66
    InteractiveProcess,
67
    InteractiveProcessResult,
68
    Process,
69
    ProcessResultMetadata,
70
)
71
from pants.engine.rules import Rule, RuleIndex, TaskRule
9✔
72
from pants.engine.unions import UnionMembership, is_union, union_in_scope_types
9✔
73
from pants.option.bootstrap_options import (
9✔
74
    LOCAL_STORE_LEASE_TIME_SECS,
75
    ExecutionOptions,
76
    LocalStoreOptions,
77
)
78
from pants.util.contextutil import temporary_file_path
9✔
79
from pants.util.logging import LogLevel
9✔
80
from pants.util.strutil import pluralize
9✔
81

82
logger = logging.getLogger(__name__)
9✔
83

84

85
Workunit = dict[str, Any]
9✔
86

87

88
class PolledWorkunits(TypedDict):
9✔
89
    started: tuple[Workunit, ...]
9✔
90
    completed: tuple[Workunit, ...]
9✔
91

92

93
@dataclass(frozen=True)
9✔
94
class ExecutionRequest:
9✔
95
    """Holds the roots for an execution, which might have been requested by a user.
96

97
    To create an ExecutionRequest, see `SchedulerSession.execution_request`.
98
    """
99

100
    native: PyExecutionRequest
9✔
101

102

103
class ExecutionError(Exception):
9✔
104
    def __init__(self, message, wrapped_exceptions=None):
9✔
105
        super().__init__(message)
9✔
106
        self.wrapped_exceptions = wrapped_exceptions or ()
9✔
107

108

109
class ExecutionTimeoutError(ExecutionError):
9✔
110
    """An ExecutionRequest specified a timeout which elapsed before the request completed."""
111

112

113
class Scheduler:
9✔
114
    def __init__(
9✔
115
        self,
116
        *,
117
        ignore_patterns: list[str],
118
        use_gitignore: bool,
119
        build_root: str,
120
        pants_workdir: str,
121
        local_execution_root_dir: str,
122
        named_caches_dir: str,
123
        ca_certs_path: str | None,
124
        rules: Iterable[Rule],
125
        union_membership: UnionMembership,
126
        execution_options: ExecutionOptions,
127
        local_store_options: LocalStoreOptions,
128
        executor: PyExecutor,
129
        include_trace_on_error: bool = True,
130
        visualize_to_dir: str | None = None,
131
        validate_reachability: bool = True,
132
        watch_filesystem: bool = True,
133
    ) -> None:
134
        """
135
        :param ignore_patterns: A list of gitignore-style file patterns for pants to ignore.
136
        :param use_gitignore: If set, pay attention to .gitignore files.
137
        :param build_root: The build root as a string.
138
        :param pants_workdir: The pants workdir as a string.
139
        :param local_execution_root_dir: The directory to use for local execution sandboxes.
140
        :param named_caches_dir: The directory to use as the root for named mutable caches.
141
        :param ca_certs_path: Path to pem file for custom CA, if needed.
142
        :param rules: A set of Rules which is used to compute values in the graph.
143
        :param union_membership: All the registered and normalized union rules.
144
        :param execution_options: Execution options for (remote) processes.
145
        :param local_store_options: Options for the engine's LMDB store(s).
146
        :param include_trace_on_error: Include the trace through the graph upon encountering errors.
147
        :param validate_reachability: True to assert that all rules in an otherwise successfully
148
          constructed rule graph are reachable: if a graph cannot be successfully constructed, it
149
          is always a fatal error.
150
        :param watch_filesystem: False if filesystem watching should be disabled.
151
        """
152
        self.include_trace_on_error = include_trace_on_error
9✔
153
        self._visualize_to_dir = visualize_to_dir
9✔
154
        self._visualize_run_count = 0
9✔
155
        # Validate and register all provided and intrinsic tasks.
156
        rule_index = RuleIndex.create(rules)
9✔
157
        tasks = register_rules(rule_index, union_membership)
9✔
158

159
        # Create the native Scheduler and Session.
160
        types = PyTypes(
9✔
161
            paths=Paths,
162
            path_metadata_request=PathMetadataRequest,
163
            path_metadata_result=PathMetadataResult,
164
            file_content=FileContent,
165
            file_entry=FileEntry,
166
            symlink_entry=SymlinkEntry,
167
            directory=Directory,
168
            digest_contents=DigestContents,
169
            digest_entries=DigestEntries,
170
            path_globs=PathGlobs,
171
            create_digest=CreateDigest,
172
            digest_subset=DigestSubset,
173
            native_download_file=NativeDownloadFile,
174
            platform=Platform,
175
            process=Process,
176
            process_result=FallibleProcessResult,
177
            process_result_metadata=ProcessResultMetadata,
178
            coroutine=CoroutineType,
179
            session_values=SessionValues,
180
            run_id=RunId,
181
            interactive_process=InteractiveProcess,
182
            interactive_process_result=InteractiveProcessResult,
183
            engine_aware_parameter=EngineAwareParameter,
184
            docker_resolve_image_request=DockerResolveImageRequest,
185
            docker_resolve_image_result=DockerResolveImageResult,
186
            parsed_python_deps_result=NativeParsedPythonDependencies,
187
            parsed_javascript_deps_result=NativeParsedJavascriptDependencies,
188
            parsed_dockerfile_info_result=NativeParsedDockerfileInfo,
189
            parsed_javascript_deps_candidate_result=ParsedJavascriptDependencyCandidate,
190
        )
191
        remoting_options = PyRemotingOptions(
9✔
192
            provider=execution_options.remote_provider.value,
193
            execution_enable=execution_options.remote_execution,
194
            store_headers=execution_options.remote_store_headers,
195
            store_chunk_bytes=execution_options.remote_store_chunk_bytes,
196
            store_rpc_retries=execution_options.remote_store_rpc_retries,
197
            store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
198
            store_rpc_timeout_millis=execution_options.remote_store_rpc_timeout_millis,
199
            store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
200
            store_batch_load_enabled=execution_options.remote_store_batch_load_enabled,
201
            cache_warnings_behavior=execution_options.remote_cache_warnings.value,
202
            cache_content_behavior=execution_options.cache_content_behavior.value,
203
            cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
204
            cache_rpc_timeout_millis=execution_options.remote_cache_rpc_timeout_millis,
205
            execution_headers=execution_options.remote_execution_headers,
206
            execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs,
207
            execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency,
208
            store_address=execution_options.remote_store_address,
209
            execution_address=execution_options.remote_execution_address,
210
            execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
211
            instance_name=execution_options.remote_instance_name,
212
            root_ca_certs_path=execution_options.remote_ca_certs_path,
213
            client_certs_path=execution_options.remote_client_certs_path,
214
            client_key_path=execution_options.remote_client_key_path,
215
            append_only_caches_base_path=execution_options.remote_execution_append_only_caches_base_path,
216
        )
217
        py_local_store_options = PyLocalStoreOptions(
9✔
218
            store_dir=local_store_options.store_dir,
219
            process_cache_max_size_bytes=local_store_options.processes_max_size_bytes,
220
            files_max_size_bytes=local_store_options.files_max_size_bytes,
221
            directories_max_size_bytes=local_store_options.directories_max_size_bytes,
222
            lease_time_millis=LOCAL_STORE_LEASE_TIME_SECS * 1000,
223
            shard_count=local_store_options.shard_count,
224
        )
225
        exec_strategy_opts = PyExecutionStrategyOptions(
9✔
226
            local_cache=execution_options.local_cache,
227
            remote_cache_read=execution_options.remote_cache_read,
228
            remote_cache_write=execution_options.remote_cache_write,
229
            use_sandboxer=execution_options.use_sandboxer,
230
            local_parallelism=execution_options.process_execution_local_parallelism,
231
            local_enable_nailgun=execution_options.process_execution_local_enable_nailgun,
232
            remote_parallelism=execution_options.process_execution_remote_parallelism,
233
            child_max_memory=execution_options.process_total_child_memory_usage or 0,
234
            child_default_memory=execution_options.process_per_child_memory_usage,
235
            graceful_shutdown_timeout=execution_options.process_execution_graceful_shutdown_timeout,
236
        )
237

238
        self._py_executor = executor
9✔
239
        self._py_scheduler = native_engine.scheduler_create(
9✔
240
            executor,
241
            tasks,
242
            types,
243
            build_root,
244
            pants_workdir,
245
            local_execution_root_dir,
246
            named_caches_dir,
247
            ignore_patterns,
248
            use_gitignore,
249
            watch_filesystem,
250
            remoting_options,
251
            py_local_store_options,
252
            exec_strategy_opts,
253
            ca_certs_path,
254
        )
255

256
        # If configured, visualize the rule graph before asserting that it is valid.
257
        if self._visualize_to_dir is not None:
9✔
258
            rule_graph_name = "rule_graph.dot"
×
259
            self.visualize_rule_graph_to_file(os.path.join(self._visualize_to_dir, rule_graph_name))
×
260

261
        if validate_reachability:
9✔
262
            native_engine.validate_reachability(self.py_scheduler)
9✔
263

264
    @property
9✔
265
    def py_scheduler(self) -> PyScheduler:
9✔
266
        return self._py_scheduler
9✔
267

268
    @property
9✔
269
    def py_executor(self) -> PyExecutor:
9✔
270
        return self._py_executor
×
271

272
    def _to_params_list(self, subject_or_params: Any | Params) -> Sequence[Any]:
9✔
273
        if isinstance(subject_or_params, Params):
9✔
274
            return subject_or_params.params
9✔
275
        return [subject_or_params]
1✔
276

277
    def visualize_rule_graph_to_file(self, filename: str) -> None:
9✔
278
        native_engine.rule_graph_visualize(self.py_scheduler, filename)
1✔
279

280
    def visualize_rule_subgraph_to_file(
9✔
281
        self, filename: str, root_subject_types: list[type], product_type: type
282
    ) -> None:
283
        native_engine.rule_subgraph_visualize(
1✔
284
            self.py_scheduler, root_subject_types, product_type, filename
285
        )
286

287
    def rule_graph_visualization(self):
9✔
288
        with temporary_file_path() as path:
1✔
289
            self.visualize_rule_graph_to_file(path)
1✔
290
            with open(path) as fd:
1✔
291
                for line in fd.readlines():
1✔
292
                    yield line.rstrip()
1✔
293

294
    def rule_subgraph_visualization(self, root_subject_types: list[type], product_type: type):
9✔
295
        with temporary_file_path() as path:
1✔
296
            self.visualize_rule_subgraph_to_file(path, root_subject_types, product_type)
1✔
297
            with open(path) as fd:
1✔
298
                for line in fd.readlines():
1✔
299
                    yield line.rstrip()
1✔
300

301
    def rule_graph_consumed_types(
9✔
302
        self, root_subject_types: Sequence[type], product_type: type
303
    ) -> Sequence[type]:
UNCOV
304
        return native_engine.rule_graph_consumed_types(
×
305
            self.py_scheduler, root_subject_types, product_type
306
        )
307

308
    def invalidate_files(self, filenames: Iterable[str]) -> int:
9✔
309
        return native_engine.graph_invalidate_paths(self.py_scheduler, filenames)
9✔
310

311
    def invalidate_all_files(self) -> int:
9✔
312
        return native_engine.graph_invalidate_all_paths(self.py_scheduler)
×
313

314
    def invalidate_all(self) -> None:
9✔
UNCOV
315
        native_engine.graph_invalidate_all(self.py_scheduler)
×
316

317
    def check_invalidation_watcher_liveness(self) -> None:
9✔
318
        native_engine.check_invalidation_watcher_liveness(self.py_scheduler)
×
319

320
    def graph_len(self) -> int:
9✔
321
        return native_engine.graph_len(self.py_scheduler)
9✔
322

323
    def execution_add_root_select(
9✔
324
        self, execution_request: PyExecutionRequest, subject_or_params: Any | Params, product: type
325
    ) -> None:
326
        params = self._to_params_list(subject_or_params)
9✔
327
        native_engine.execution_add_root_select(
9✔
328
            self.py_scheduler, execution_request, params, product
329
        )
330

331
    @property
9✔
332
    def visualize_to_dir(self) -> str | None:
9✔
333
        return self._visualize_to_dir
9✔
334

335
    def garbage_collect_store(self, target_size_bytes: int) -> None:
9✔
336
        native_engine.garbage_collect_store(self.py_scheduler, target_size_bytes)
1✔
337

338
    def new_session(
9✔
339
        self,
340
        build_id: str,
341
        dynamic_ui: bool = False,
342
        ui_use_prodash: bool = False,
343
        max_workunit_level: LogLevel = LogLevel.DEBUG,
344
        session_values: SessionValues | None = None,
345
        cancellation_latch: PySessionCancellationLatch | None = None,
346
    ) -> SchedulerSession:
347
        """Creates a new SchedulerSession for this Scheduler."""
348
        return SchedulerSession(
9✔
349
            self,
350
            PySession(
351
                scheduler=self.py_scheduler,
352
                dynamic_ui=dynamic_ui,
353
                ui_use_prodash=ui_use_prodash,
354
                max_workunit_level=max_workunit_level.level,
355
                build_id=build_id,
356
                session_values=session_values or SessionValues(),
357
                cancellation_latch=cancellation_latch or PySessionCancellationLatch(),
358
            ),
359
        )
360

361
    def shutdown(self, timeout_secs: int = 60) -> None:
9✔
362
        native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs)
×
363

364

365
class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]):
9✔
366
    pass
9✔
367

368

369
class SchedulerSession:
9✔
370
    """A handle to a shared underlying Scheduler and a unique Session.
371

372
    Generally a Session corresponds to a single run of pants: some metrics are specific to a
373
    Session.
374
    """
375

376
    def __init__(self, scheduler: Scheduler, session: PySession) -> None:
9✔
377
        self._scheduler = scheduler
9✔
378
        self._py_session = session
9✔
379
        self._goals = session.session_values.get(CurrentExecutingGoals) or CurrentExecutingGoals()
9✔
380

381
    @property
9✔
382
    def scheduler(self) -> Scheduler:
9✔
383
        return self._scheduler
9✔
384

385
    @property
9✔
386
    def py_scheduler(self) -> PyScheduler:
9✔
387
        return self._scheduler.py_scheduler
9✔
388

389
    @property
9✔
390
    def py_session(self) -> PySession:
9✔
391
        return self._py_session
9✔
392

393
    def isolated_shallow_clone(self, build_id: str) -> SchedulerSession:
9✔
394
        return SchedulerSession(
1✔
395
            self._scheduler,
396
            native_engine.session_isolated_shallow_clone(self._py_session, build_id),
397
        )
398

399
    def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
9✔
400
        result = native_engine.session_poll_workunits(
1✔
401
            self.py_scheduler, self.py_session, max_log_verbosity.level
402
        )
403
        return {"started": result[0], "completed": result[1]}
1✔
404

405
    def new_run_id(self) -> None:
9✔
406
        """Assigns a new "run id" to this Session, without creating a new Session.
407

408
        Usually each Session corresponds to one end user "run", but there are exceptions: notably,
409
        the `--loop` feature uses one Session, but would like to observe new values for uncacheable
410
        nodes in each iteration of its loop.
411
        """
412
        native_engine.session_new_run_id(self.py_session)
1✔
413

414
    def visualize_graph_to_file(self, filename: str) -> None:
9✔
415
        """Visualize a graph walk by writing graphviz `dot` output to a file."""
416
        native_engine.graph_visualize(self.py_scheduler, self.py_session, filename)
×
417

418
    def visualize_rule_graph_to_file(self, filename: str) -> None:
9✔
419
        self._scheduler.visualize_rule_graph_to_file(filename)
×
420

421
    def rule_graph_rule_gets(self) -> dict[Callable, list[tuple[type, list[type], Callable]]]:
9✔
422
        return native_engine.rule_graph_rule_gets(self.py_scheduler)
×
423

424
    def execution_request(
9✔
425
        self,
426
        requests: Sequence[tuple[type, Any | Params]],
427
        poll: bool = False,
428
        poll_delay: float | None = None,
429
        timeout: float | None = None,
430
    ) -> ExecutionRequest:
431
        """Create and return an ExecutionRequest for the given (product, subject) pairs.
432

433
        :param requests: A sequence of product types to request for subjects.
434
        :param poll: True to wait for _all_ of the given roots to
435
          have changed since their last observed values in this SchedulerSession.
436
        :param poll_delay: A delay (in seconds) to wait after observing a change, and before
437
          beginning to compute a new value.
438
        :param timeout: An optional timeout to wait for the request to complete (in seconds). If the
439
          request has not completed before the timeout has elapsed, ExecutionTimeoutError is raised.
440
        :returns: An ExecutionRequest for the given products and subjects.
441
        """
442
        native_execution_request = PyExecutionRequest(
9✔
443
            poll=poll,
444
            poll_delay_in_ms=int(poll_delay * 1000) if poll_delay else None,
445
            timeout_in_ms=int(timeout * 1000) if timeout else None,
446
        )
447
        for product, subject in requests:
9✔
448
            self._scheduler.execution_add_root_select(native_execution_request, subject, product)
9✔
449
        return ExecutionRequest(native_execution_request)
9✔
450

451
    def invalidate_files(self, direct_filenames: Iterable[str]) -> int:
9✔
452
        """Invalidates the given filenames in an internal product Graph instance."""
453
        invalidated = self._scheduler.invalidate_files(direct_filenames)
9✔
454
        self._maybe_visualize()
9✔
455
        return invalidated
9✔
456

457
    def invalidate_all_files(self) -> int:
9✔
458
        """Invalidates all filenames in an internal product Graph instance."""
459
        invalidated = self._scheduler.invalidate_all_files()
×
460
        self._maybe_visualize()
×
461
        return invalidated
×
462

463
    def metrics(self) -> dict[str, int]:
9✔
464
        """Returns metrics for this SchedulerSession as a dict of metric name to metric value."""
465
        return native_engine.scheduler_metrics(self.py_scheduler, self.py_session)
×
466

467
    def live_items(self) -> tuple[list[Any], dict[str, tuple[int, int]]]:
9✔
468
        """Return all Python objects held by the Scheduler."""
469
        return native_engine.scheduler_live_items(self.py_scheduler, self.py_session)
×
470

471
    def _maybe_visualize(self) -> None:
9✔
472
        if self._scheduler.visualize_to_dir is not None:
9✔
473
            # TODO: This increment-and-get is racey.
474
            name = f"graph.{self._scheduler._visualize_run_count:03d}.dot"
×
475
            self._scheduler._visualize_run_count += 1
×
476
            logger.info(f"Visualizing graph as {name}")
×
477
            self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name))
×
478

479
    def teardown_dynamic_ui(self) -> None:
9✔
480
        native_engine.teardown_dynamic_ui(self.py_scheduler, self.py_session)
×
481

482
    def _execute(
9✔
483
        self, execution_request: ExecutionRequest
484
    ) -> tuple[tuple[Return, ...], tuple[Throw, ...]]:
485
        start_time = time.time()
9✔
486
        try:
9✔
487
            raw_roots = native_engine.scheduler_execute(
9✔
488
                self.py_scheduler,
489
                self.py_session,
490
                execution_request.native,
491
            )
492
        except native_engine.PollTimeout:
×
493
            raise ExecutionTimeoutError("Timed out")
×
494

495
        states = [
9✔
496
            (
497
                Throw(
498
                    raw_root.result,
499
                    python_traceback=raw_root.python_traceback,
500
                    engine_traceback=raw_root.engine_traceback,
501
                )
502
                if raw_root.is_throw
503
                else Return(raw_root.result)
504
            )
505
            for raw_root in raw_roots
506
        ]
507

508
        self._maybe_visualize()
9✔
509
        logger.debug(
9✔
510
            "computed %s nodes in %f seconds. there are %s total nodes.",
511
            len(states),
512
            time.time() - start_time,
513
            self._scheduler.graph_len(),
514
        )
515

516
        returns = tuple(state for state in states if isinstance(state, Return))
9✔
517
        throws = tuple(state for state in states if isinstance(state, Throw))
9✔
518
        return returns, throws
9✔
519

520
    def _raise_on_error(self, throws: Sequence[Throw]) -> NoReturn:
9✔
521
        exception_noun = pluralize(len(throws), "Exception")
9✔
522
        others_msg = f"\n(and {len(throws) - 1} more)\n" if len(throws) > 1 else ""
9✔
523
        raise ExecutionError(
9✔
524
            f"{exception_noun} encountered:\n\n"
525
            f"{throws[0].render(self._scheduler.include_trace_on_error)}\n"
526
            f"{others_msg}",
527
            wrapped_exceptions=tuple(t.exc for t in throws),
528
        )
529

530
    def execute(self, execution_request: ExecutionRequest) -> list[Any]:
9✔
531
        """Invoke the engine for the given ExecutionRequest, returning successful values or raising.
532

533
        :return: A sequence of per-request results.
534
        """
535
        returns, throws = self._execute(execution_request)
9✔
536

537
        # Throw handling.
538
        if throws:
9✔
539
            self._raise_on_error(throws)
9✔
540

541
        # Everything is a Return: we rely on the fact that roots are ordered to preserve subject
542
        # order in output lists.
543
        return [ret.value for ret in returns]
9✔
544

545
    def run_goal_rule(
9✔
546
        self,
547
        product: type[Goal],
548
        subject: Params,
549
        *,
550
        poll: bool = False,
551
        poll_delay: float | None = None,
552
    ) -> int:
553
        """
554
        :param product: A Goal subtype.
555
        :param subject: subject for the request.
556
        :param poll: See self.execution_request.
557
        :param poll_delay: See self.execution_request.
558
        :returns: An exit_code for the given Goal.
559
        """
560
        if self._scheduler.visualize_to_dir is not None:
9✔
561
            rule_graph_name = f"rule_graph.{product.name}.dot"
×
562
            params = self._scheduler._to_params_list(subject)
×
563
            self._scheduler.visualize_rule_subgraph_to_file(
×
564
                os.path.join(self._scheduler.visualize_to_dir, rule_graph_name),
565
                [type(p) for p in params],
566
                product,
567
            )
568
        with self._goals._execute(product):
9✔
569
            (return_value,) = self.product_request(
9✔
570
                product, subject, poll=poll, poll_delay=poll_delay
571
            )
572
        return cast(int, return_value.exit_code)
9✔
573

574
    def product_request(
9✔
575
        self,
576
        product: type,
577
        subject: Any | Params,
578
        *,
579
        poll: bool = False,
580
        poll_delay: float | None = None,
581
        timeout: float | None = None,
582
    ) -> list:
583
        """Executes a request for a single product for a subject, and returns the products.
584

585
        :param product: A product type for the request.
586
        :param subject: A subject or Params instance for the request.
587
        :param poll: See self.execution_request.
588
        :param poll_delay: See self.execution_request.
589
        :param timeout: See self.execution_request.
590
        :returns: A list of the requested products, with length match len(subjects).
591
        """
592
        request = self.execution_request(
9✔
593
            [(product, subject)],
594
            poll=poll,
595
            poll_delay=poll_delay,
596
            timeout=timeout,
597
        )
598
        return self.execute(request)
9✔
599

600
    def capture_snapshots(self, path_globs_and_roots: Iterable[PathGlobsAndRoot]) -> list[Snapshot]:
9✔
601
        """Synchronously captures Snapshots for each matching PathGlobs rooted at a its root
602
        directory.
603

604
        This is a blocking operation, and should be avoided where possible.
605
        """
UNCOV
606
        return native_engine.capture_snapshots(
×
607
            self.py_scheduler,
608
            self.py_session,
609
            _PathGlobsAndRootCollection(path_globs_and_roots),
610
        )
611

612
    def single_file_digests_to_bytes(self, digests: Sequence[FileDigest]) -> list[bytes]:
9✔
613
        return native_engine.single_file_digests_to_bytes(self.py_scheduler, list(digests))
1✔
614

615
    def snapshots_to_file_contents(
9✔
616
        self, snapshots: Sequence[Snapshot]
617
    ) -> tuple[DigestContents, ...]:
618
        """For each input `Snapshot`, yield a single `DigestContents` containing all the
619
        `FileContent`s corresponding to the file(s) contained within that `Snapshot`.
620

621
        Note that we cannot currently use a parallelized version of `self.product_request` since
622
        each snapshot needs to yield a separate `DigestContents`.
623
        """
624
        return tuple(
1✔
625
            self.product_request(DigestContents, snapshot.digest)[0] for snapshot in snapshots
626
        )
627

628
    def ensure_remote_has_recursive(self, digests: Sequence[Digest | FileDigest]) -> None:
9✔
629
        native_engine.ensure_remote_has_recursive(self.py_scheduler, list(digests))
1✔
630

631
    def ensure_directory_digest_persisted(self, digest: Digest) -> None:
9✔
UNCOV
632
        native_engine.ensure_directory_digest_persisted(self.py_scheduler, digest)
×
633

634
    def write_digest(
9✔
635
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
636
    ) -> None:
637
        """Write a digest to disk, relative to the build root."""
638
        if path_prefix and PurePath(path_prefix).is_absolute():
3✔
639
            raise ValueError(
×
640
                f"The `path_prefix` {path_prefix} must be a relative path, as the engine writes "
641
                "the digest relative to the build root."
642
            )
643
        native_engine.write_digest(
3✔
644
            self.py_scheduler, self.py_session, digest, path_prefix or "", clear_paths
645
        )
646

647
    def lease_files_in_graph(self) -> None:
9✔
648
        native_engine.lease_files_in_graph(self.py_scheduler, self.py_session)
1✔
649

650
    def garbage_collect_store(self, target_size_bytes: int) -> None:
9✔
651
        self._scheduler.garbage_collect_store(target_size_bytes)
1✔
652

653
    def get_metrics(self) -> dict[str, int]:
9✔
654
        return native_engine.session_get_metrics(self.py_session)
1✔
655

656
    def get_observation_histograms(self) -> dict[str, Any]:
9✔
657
        return native_engine.session_get_observation_histograms(self.py_scheduler, self.py_session)
1✔
658

659
    def record_test_observation(self, value: int) -> None:
9✔
660
        native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value)
1✔
661

662
    @property
9✔
663
    def is_cancelled(self) -> bool:
9✔
664
        return self.py_session.is_cancelled()
×
665

666
    def cancel(self) -> None:
9✔
667
        self.py_session.cancel()
1✔
668

669
    def wait_for_tail_tasks(self, timeout: float) -> None:
9✔
670
        native_engine.session_wait_for_tail_tasks(self.py_scheduler, self.py_session, timeout)
×
671

672

673
def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks:
9✔
674
    """Create a native Tasks object loaded with given RuleIndex."""
675
    tasks = PyTasks()
9✔
676

677
    # Compute a reverse index of union membership.
678
    member_type_to_base_types = defaultdict(list)
9✔
679
    for base_type, member_types in union_membership.items():
9✔
680
        for member_type in member_types:
9✔
681
            member_type_to_base_types[member_type].append(base_type)
9✔
682

683
    # Compute map from union base type to rules that have one of its member types as a param.
684
    # The value is a list of pairs (rule, member type in that rule's params).
685
    base_type_to_member_rule_type_pairs: dict[type, list[tuple[TaskRule, type[Any]]]] = defaultdict(
9✔
686
        list
687
    )
688
    rule_id_to_rule: dict[str, TaskRule] = {}
9✔
689
    for task_rule in rule_index.rules:
9✔
690
        rule_id_to_rule[task_rule.canonical_name] = task_rule
9✔
691
        for param_type in task_rule.parameters.values():
9✔
692
            for base_type in member_type_to_base_types.get(param_type, tuple()):
9✔
693
                base_type_to_member_rule_type_pairs[base_type].append((task_rule, param_type))
9✔
694

695
    def register_task(rule: TaskRule) -> None:
9✔
696
        native_engine.tasks_task_begin(
9✔
697
            tasks,
698
            rule.func,
699
            rule.output_type,
700
            tuple(rule.parameters.items()),
701
            rule.masked_types,
702
            side_effecting=any(issubclass(t, SideEffecting) for t in rule.parameters.values()),
703
            engine_aware_return_type=issubclass(rule.output_type, EngineAwareReturnType),
704
            cacheable=rule.cacheable,
705
            name=rule.canonical_name,
706
            desc=rule.desc or "",
707
            level=rule.level.level,
708
        )
709

710
        for awaitable in rule.awaitables:
9✔
711
            unions = [t for t in awaitable.input_types if is_union(t)]
9✔
712
            if len(unions) == 1:
9✔
713
                union = unions[0]
9✔
714
                if awaitable.rule_id:
9✔
715
                    if rule_id_to_rule[awaitable.rule_id].polymorphic:
9✔
716
                        # This is a polymorphic call-by-name. Compute its vtable data, i.e., a list
717
                        # of pairs (union member type, id of implementation rule for that type).
718
                        member_rule_type_pairs = base_type_to_member_rule_type_pairs.get(union)
9✔
719
                        vtable_entries: list[tuple[type[Any], str]] = []
9✔
720
                        for member_rule, member_type in member_rule_type_pairs or []:
9✔
721
                            # If a rule has a union member as a param, and returns the relevant
722
                            # output type, then we take it to be the implementation of the
723
                            # polymorphic rule for the union member.
724
                            if member_rule.output_type == awaitable.output_type:
9✔
725
                                vtable_entries.append((member_type, member_rule.canonical_name))
9✔
726
                        in_scope_types = union_in_scope_types(union)
9✔
727
                        assert in_scope_types is not None
9✔
728
                        native_engine.tasks_add_call(
9✔
729
                            tasks,
730
                            awaitable.output_type,
731
                            awaitable.input_types,
732
                            awaitable.rule_id,
733
                            awaitable.explicit_args_arity,
734
                            vtable_entries,
735
                            in_scope_types,
736
                        )
737
                else:
738
                    # This is a union Get.
739
                    # Register the union by recording a copy of the Get for each union member.
740
                    in_scope_types = union_in_scope_types(union)
×
741
                    assert in_scope_types is not None
×
742
                    for union_member in union_membership.get(union):
×
743
                        native_engine.tasks_add_get_union(
×
744
                            tasks,
745
                            awaitable.output_type,
746
                            tuple(union_member if t == union else t for t in awaitable.input_types),
747
                            in_scope_types,
748
                        )
749
            elif len(unions) > 1:
9✔
750
                raise TypeError(
×
751
                    f"Only one @union may be used in a call or Get, but {awaitable} used: {unions}."
752
                )
753
            elif awaitable.rule_id is not None:
9✔
754
                # Is a non-polymorphic call to a known rule.
755
                native_engine.tasks_add_call(
9✔
756
                    tasks,
757
                    awaitable.output_type,
758
                    awaitable.input_types,
759
                    awaitable.rule_id,
760
                    awaitable.explicit_args_arity,
761
                    vtable_entries=None,
762
                    in_scope_types=None,
763
                )
764
            else:
765
                # Otherwise, the Get subject is a "concrete" type, so add a single Get edge.
766
                native_engine.tasks_add_get(tasks, awaitable.output_type, awaitable.input_types)
×
767

768
        native_engine.tasks_task_end(tasks)
9✔
769

770
    for task_rule in rule_index.rules:
9✔
771
        register_task(task_rule)
9✔
772
    for query in rule_index.queries:
9✔
773
        native_engine.tasks_add_query(
9✔
774
            tasks,
775
            query.output_type,
776
            query.input_types,
777
        )
778
    return tasks
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc