• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18517631058

15 Oct 2025 04:18AM UTC coverage: 69.207% (-11.1%) from 80.267%
18517631058

Pull #22745

github

web-flow
Merge 642a76ca1 into 99919310e
Pull Request #22745: [windows] Add windows support in the stdio crate.

53815 of 77759 relevant lines covered (69.21%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.85
/src/python/pants/engine/internals/scheduler.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
7✔
5

6
import logging
7✔
7
import os
7✔
8
import time
7✔
9
from collections import defaultdict
7✔
10
from collections.abc import Callable, Iterable, Sequence
7✔
11
from dataclasses import dataclass
7✔
12
from pathlib import PurePath
7✔
13
from types import CoroutineType
7✔
14
from typing import Any, NoReturn, cast
7✔
15

16
from typing_extensions import TypedDict
7✔
17

18
from pants.engine.collection import Collection
7✔
19
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType, SideEffecting
7✔
20
from pants.engine.fs import (
7✔
21
    CreateDigest,
22
    Digest,
23
    DigestContents,
24
    DigestEntries,
25
    DigestSubset,
26
    Directory,
27
    FileContent,
28
    FileDigest,
29
    FileEntry,
30
    NativeDownloadFile,
31
    PathGlobs,
32
    PathGlobsAndRoot,
33
    PathMetadataRequest,
34
    PathMetadataResult,
35
    Paths,
36
    Snapshot,
37
    SymlinkEntry,
38
)
39
from pants.engine.goal import CurrentExecutingGoals, Goal
7✔
40
from pants.engine.internals import native_engine
7✔
41
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
7✔
42
from pants.engine.internals.native_dep_inference import (
7✔
43
    NativeParsedDockerfileInfo,
44
    NativeParsedJavascriptDependencies,
45
    NativeParsedPythonDependencies,
46
    ParsedJavascriptDependencyCandidate,
47
)
48
from pants.engine.internals.native_engine import (
7✔
49
    PyExecutionRequest,
50
    PyExecutionStrategyOptions,
51
    PyExecutor,
52
    PyLocalStoreOptions,
53
    PyRemotingOptions,
54
    PyScheduler,
55
    PySession,
56
    PySessionCancellationLatch,
57
    PyTasks,
58
    PyTypes,
59
)
60
from pants.engine.internals.nodes import Return, Throw
7✔
61
from pants.engine.internals.selectors import Params
7✔
62
from pants.engine.internals.session import RunId, SessionValues
7✔
63
from pants.engine.platform import Platform
7✔
64
from pants.engine.process import (
7✔
65
    FallibleProcessResult,
66
    InteractiveProcess,
67
    InteractiveProcessResult,
68
    Process,
69
    ProcessResultMetadata,
70
)
71
from pants.engine.rules import Rule, RuleIndex, TaskRule
7✔
72
from pants.engine.unions import UnionMembership, is_union, union_in_scope_types
7✔
73
from pants.option.bootstrap_options import (
7✔
74
    LOCAL_STORE_LEASE_TIME_SECS,
75
    ExecutionOptions,
76
    LocalStoreOptions,
77
)
78
from pants.util.contextutil import temporary_file_path
7✔
79
from pants.util.logging import LogLevel
7✔
80
from pants.util.strutil import pluralize
7✔
81

82
logger = logging.getLogger(__name__)
7✔
83

84

85
Workunit = dict[str, Any]
7✔
86

87

88
class PolledWorkunits(TypedDict):
7✔
89
    started: tuple[Workunit, ...]
7✔
90
    completed: tuple[Workunit, ...]
7✔
91

92

93
@dataclass(frozen=True)
7✔
94
class ExecutionRequest:
7✔
95
    """Holds the roots for an execution, which might have been requested by a user.
96

97
    To create an ExecutionRequest, see `SchedulerSession.execution_request`.
98
    """
99

100
    roots: tuple[tuple[type, Any | Params], ...]
7✔
101
    native: PyExecutionRequest
7✔
102

103

104
class ExecutionError(Exception):
7✔
105
    def __init__(self, message, wrapped_exceptions=None):
7✔
106
        super().__init__(message)
7✔
107
        self.wrapped_exceptions = wrapped_exceptions or ()
7✔
108

109

110
class ExecutionTimeoutError(ExecutionError):
7✔
111
    """An ExecutionRequest specified a timeout which elapsed before the request completed."""
112

113

114
class Scheduler:
7✔
115
    def __init__(
7✔
116
        self,
117
        *,
118
        ignore_patterns: list[str],
119
        use_gitignore: bool,
120
        build_root: str,
121
        pants_workdir: str,
122
        local_execution_root_dir: str,
123
        named_caches_dir: str,
124
        ca_certs_path: str | None,
125
        rules: Iterable[Rule],
126
        union_membership: UnionMembership,
127
        execution_options: ExecutionOptions,
128
        local_store_options: LocalStoreOptions,
129
        executor: PyExecutor,
130
        include_trace_on_error: bool = True,
131
        visualize_to_dir: str | None = None,
132
        validate_reachability: bool = True,
133
        watch_filesystem: bool = True,
134
    ) -> None:
135
        """
136
        :param ignore_patterns: A list of gitignore-style file patterns for pants to ignore.
137
        :param use_gitignore: If set, pay attention to .gitignore files.
138
        :param build_root: The build root as a string.
139
        :param pants_workdir: The pants workdir as a string.
140
        :param local_execution_root_dir: The directory to use for local execution sandboxes.
141
        :param named_caches_dir: The directory to use as the root for named mutable caches.
142
        :param ca_certs_path: Path to pem file for custom CA, if needed.
143
        :param rules: A set of Rules which is used to compute values in the graph.
144
        :param union_membership: All the registered and normalized union rules.
145
        :param execution_options: Execution options for (remote) processes.
146
        :param local_store_options: Options for the engine's LMDB store(s).
147
        :param include_trace_on_error: Include the trace through the graph upon encountering errors.
148
        :param validate_reachability: True to assert that all rules in an otherwise successfully
149
          constructed rule graph are reachable: if a graph cannot be successfully constructed, it
150
          is always a fatal error.
151
        :param watch_filesystem: False if filesystem watching should be disabled.
152
        """
153
        self.include_trace_on_error = include_trace_on_error
7✔
154
        self._visualize_to_dir = visualize_to_dir
7✔
155
        self._visualize_run_count = 0
7✔
156
        # Validate and register all provided and intrinsic tasks.
157
        rule_index = RuleIndex.create(rules)
7✔
158
        tasks = register_rules(rule_index, union_membership)
7✔
159

160
        # Create the native Scheduler and Session.
161
        types = PyTypes(
7✔
162
            paths=Paths,
163
            path_metadata_request=PathMetadataRequest,
164
            path_metadata_result=PathMetadataResult,
165
            file_content=FileContent,
166
            file_entry=FileEntry,
167
            symlink_entry=SymlinkEntry,
168
            directory=Directory,
169
            digest_contents=DigestContents,
170
            digest_entries=DigestEntries,
171
            path_globs=PathGlobs,
172
            create_digest=CreateDigest,
173
            digest_subset=DigestSubset,
174
            native_download_file=NativeDownloadFile,
175
            platform=Platform,
176
            process=Process,
177
            process_result=FallibleProcessResult,
178
            process_result_metadata=ProcessResultMetadata,
179
            coroutine=CoroutineType,
180
            session_values=SessionValues,
181
            run_id=RunId,
182
            interactive_process=InteractiveProcess,
183
            interactive_process_result=InteractiveProcessResult,
184
            engine_aware_parameter=EngineAwareParameter,
185
            docker_resolve_image_request=DockerResolveImageRequest,
186
            docker_resolve_image_result=DockerResolveImageResult,
187
            parsed_python_deps_result=NativeParsedPythonDependencies,
188
            parsed_javascript_deps_result=NativeParsedJavascriptDependencies,
189
            parsed_dockerfile_info_result=NativeParsedDockerfileInfo,
190
            parsed_javascript_deps_candidate_result=ParsedJavascriptDependencyCandidate,
191
        )
192
        remoting_options = PyRemotingOptions(
7✔
193
            provider=execution_options.remote_provider.value,
194
            execution_enable=execution_options.remote_execution,
195
            store_headers=execution_options.remote_store_headers,
196
            store_chunk_bytes=execution_options.remote_store_chunk_bytes,
197
            store_rpc_retries=execution_options.remote_store_rpc_retries,
198
            store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
199
            store_rpc_timeout_millis=execution_options.remote_store_rpc_timeout_millis,
200
            store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
201
            store_batch_load_enabled=execution_options.remote_store_batch_load_enabled,
202
            cache_warnings_behavior=execution_options.remote_cache_warnings.value,
203
            cache_content_behavior=execution_options.cache_content_behavior.value,
204
            cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
205
            cache_rpc_timeout_millis=execution_options.remote_cache_rpc_timeout_millis,
206
            execution_headers=execution_options.remote_execution_headers,
207
            execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs,
208
            execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency,
209
            store_address=execution_options.remote_store_address,
210
            execution_address=execution_options.remote_execution_address,
211
            execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
212
            instance_name=execution_options.remote_instance_name,
213
            root_ca_certs_path=execution_options.remote_ca_certs_path,
214
            client_certs_path=execution_options.remote_client_certs_path,
215
            client_key_path=execution_options.remote_client_key_path,
216
            append_only_caches_base_path=execution_options.remote_execution_append_only_caches_base_path,
217
        )
218
        py_local_store_options = PyLocalStoreOptions(
7✔
219
            store_dir=local_store_options.store_dir,
220
            process_cache_max_size_bytes=local_store_options.processes_max_size_bytes,
221
            files_max_size_bytes=local_store_options.files_max_size_bytes,
222
            directories_max_size_bytes=local_store_options.directories_max_size_bytes,
223
            lease_time_millis=LOCAL_STORE_LEASE_TIME_SECS * 1000,
224
            shard_count=local_store_options.shard_count,
225
        )
226
        exec_strategy_opts = PyExecutionStrategyOptions(
7✔
227
            local_cache=execution_options.local_cache,
228
            remote_cache_read=execution_options.remote_cache_read,
229
            remote_cache_write=execution_options.remote_cache_write,
230
            use_sandboxer=execution_options.use_sandboxer,
231
            local_parallelism=execution_options.process_execution_local_parallelism,
232
            local_enable_nailgun=execution_options.process_execution_local_enable_nailgun,
233
            remote_parallelism=execution_options.process_execution_remote_parallelism,
234
            child_max_memory=execution_options.process_total_child_memory_usage or 0,
235
            child_default_memory=execution_options.process_per_child_memory_usage,
236
            graceful_shutdown_timeout=execution_options.process_execution_graceful_shutdown_timeout,
237
        )
238

239
        self._py_executor = executor
7✔
240
        self._py_scheduler = native_engine.scheduler_create(
7✔
241
            executor,
242
            tasks,
243
            types,
244
            build_root,
245
            pants_workdir,
246
            local_execution_root_dir,
247
            named_caches_dir,
248
            ignore_patterns,
249
            use_gitignore,
250
            watch_filesystem,
251
            remoting_options,
252
            py_local_store_options,
253
            exec_strategy_opts,
254
            ca_certs_path,
255
        )
256

257
        # If configured, visualize the rule graph before asserting that it is valid.
258
        if self._visualize_to_dir is not None:
7✔
259
            rule_graph_name = "rule_graph.dot"
×
260
            self.visualize_rule_graph_to_file(os.path.join(self._visualize_to_dir, rule_graph_name))
×
261

262
        if validate_reachability:
7✔
263
            native_engine.validate_reachability(self.py_scheduler)
7✔
264

265
    @property
7✔
266
    def py_scheduler(self) -> PyScheduler:
7✔
267
        return self._py_scheduler
7✔
268

269
    @property
7✔
270
    def py_executor(self) -> PyExecutor:
7✔
271
        return self._py_executor
×
272

273
    def _to_params_list(self, subject_or_params: Any | Params) -> Sequence[Any]:
7✔
274
        if isinstance(subject_or_params, Params):
7✔
275
            return subject_or_params.params
7✔
276
        return [subject_or_params]
×
277

278
    def visualize_rule_graph_to_file(self, filename: str) -> None:
7✔
279
        native_engine.rule_graph_visualize(self.py_scheduler, filename)
1✔
280

281
    def visualize_rule_subgraph_to_file(
7✔
282
        self, filename: str, root_subject_types: list[type], product_type: type
283
    ) -> None:
284
        native_engine.rule_subgraph_visualize(
1✔
285
            self.py_scheduler, root_subject_types, product_type, filename
286
        )
287

288
    def rule_graph_visualization(self):
7✔
289
        with temporary_file_path() as path:
1✔
290
            self.visualize_rule_graph_to_file(path)
1✔
291
            with open(path) as fd:
1✔
292
                for line in fd.readlines():
1✔
293
                    yield line.rstrip()
1✔
294

295
    def rule_subgraph_visualization(self, root_subject_types: list[type], product_type: type):
7✔
296
        with temporary_file_path() as path:
1✔
297
            self.visualize_rule_subgraph_to_file(path, root_subject_types, product_type)
1✔
298
            with open(path) as fd:
1✔
299
                for line in fd.readlines():
1✔
300
                    yield line.rstrip()
1✔
301

302
    def rule_graph_consumed_types(
7✔
303
        self, root_subject_types: Sequence[type], product_type: type
304
    ) -> Sequence[type]:
305
        return native_engine.rule_graph_consumed_types(
×
306
            self.py_scheduler, root_subject_types, product_type
307
        )
308

309
    def invalidate_files(self, filenames: Iterable[str]) -> int:
7✔
310
        return native_engine.graph_invalidate_paths(self.py_scheduler, filenames)
7✔
311

312
    def invalidate_all_files(self) -> int:
7✔
313
        return native_engine.graph_invalidate_all_paths(self.py_scheduler)
×
314

315
    def invalidate_all(self) -> None:
7✔
316
        native_engine.graph_invalidate_all(self.py_scheduler)
×
317

318
    def check_invalidation_watcher_liveness(self) -> None:
7✔
319
        native_engine.check_invalidation_watcher_liveness(self.py_scheduler)
×
320

321
    def graph_len(self) -> int:
7✔
322
        return native_engine.graph_len(self.py_scheduler)
7✔
323

324
    def execution_add_root_select(
7✔
325
        self, execution_request: PyExecutionRequest, subject_or_params: Any | Params, product: type
326
    ) -> None:
327
        params = self._to_params_list(subject_or_params)
7✔
328
        native_engine.execution_add_root_select(
7✔
329
            self.py_scheduler, execution_request, params, product
330
        )
331

332
    @property
7✔
333
    def visualize_to_dir(self) -> str | None:
7✔
334
        return self._visualize_to_dir
7✔
335

336
    def garbage_collect_store(self, target_size_bytes: int) -> None:
7✔
337
        native_engine.garbage_collect_store(self.py_scheduler, target_size_bytes)
1✔
338

339
    def new_session(
7✔
340
        self,
341
        build_id: str,
342
        dynamic_ui: bool = False,
343
        ui_use_prodash: bool = False,
344
        max_workunit_level: LogLevel = LogLevel.DEBUG,
345
        session_values: SessionValues | None = None,
346
        cancellation_latch: PySessionCancellationLatch | None = None,
347
    ) -> SchedulerSession:
348
        """Creates a new SchedulerSession for this Scheduler."""
349
        return SchedulerSession(
7✔
350
            self,
351
            PySession(
352
                scheduler=self.py_scheduler,
353
                dynamic_ui=dynamic_ui,
354
                ui_use_prodash=ui_use_prodash,
355
                max_workunit_level=max_workunit_level.level,
356
                build_id=build_id,
357
                session_values=session_values or SessionValues(),
358
                cancellation_latch=cancellation_latch or PySessionCancellationLatch(),
359
            ),
360
        )
361

362
    def shutdown(self, timeout_secs: int = 60) -> None:
7✔
363
        native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs)
×
364

365

366
class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]):
7✔
367
    pass
7✔
368

369

370
class SchedulerSession:
7✔
371
    """A handle to a shared underlying Scheduler and a unique Session.
372

373
    Generally a Session corresponds to a single run of pants: some metrics are specific to a
374
    Session.
375
    """
376

377
    def __init__(self, scheduler: Scheduler, session: PySession) -> None:
7✔
378
        self._scheduler = scheduler
7✔
379
        self._py_session = session
7✔
380
        self._goals = session.session_values.get(CurrentExecutingGoals) or CurrentExecutingGoals()
7✔
381

382
    @property
7✔
383
    def scheduler(self) -> Scheduler:
7✔
384
        return self._scheduler
7✔
385

386
    @property
7✔
387
    def py_scheduler(self) -> PyScheduler:
7✔
388
        return self._scheduler.py_scheduler
7✔
389

390
    @property
7✔
391
    def py_session(self) -> PySession:
7✔
392
        return self._py_session
7✔
393

394
    def isolated_shallow_clone(self, build_id: str) -> SchedulerSession:
7✔
395
        return SchedulerSession(
×
396
            self._scheduler,
397
            native_engine.session_isolated_shallow_clone(self._py_session, build_id),
398
        )
399

400
    def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
7✔
401
        result = native_engine.session_poll_workunits(
×
402
            self.py_scheduler, self.py_session, max_log_verbosity.level
403
        )
404
        return {"started": result[0], "completed": result[1]}
×
405

406
    def new_run_id(self) -> None:
7✔
407
        """Assigns a new "run id" to this Session, without creating a new Session.
408

409
        Usually each Session corresponds to one end user "run", but there are exceptions: notably,
410
        the `--loop` feature uses one Session, but would like to observe new values for uncacheable
411
        nodes in each iteration of its loop.
412
        """
413
        native_engine.session_new_run_id(self.py_session)
1✔
414

415
    def visualize_graph_to_file(self, filename: str) -> None:
7✔
416
        """Visualize a graph walk by writing graphviz `dot` output to a file."""
417
        native_engine.graph_visualize(self.py_scheduler, self.py_session, filename)
×
418

419
    def visualize_rule_graph_to_file(self, filename: str) -> None:
7✔
420
        self._scheduler.visualize_rule_graph_to_file(filename)
×
421

422
    def rule_graph_rule_gets(self) -> dict[Callable, list[tuple[type, list[type], Callable]]]:
7✔
423
        return native_engine.rule_graph_rule_gets(self.py_scheduler)
×
424

425
    def execution_request(
7✔
426
        self,
427
        requests: Sequence[tuple[type, Any | Params]],
428
        poll: bool = False,
429
        poll_delay: float | None = None,
430
        timeout: float | None = None,
431
    ) -> ExecutionRequest:
432
        """Create and return an ExecutionRequest for the given (product, subject) pairs.
433

434
        :param requests: A sequence of product types to request for subjects.
435
        :param poll: True to wait for _all_ of the given roots to
436
          have changed since their last observed values in this SchedulerSession.
437
        :param poll_delay: A delay (in seconds) to wait after observing a change, and before
438
          beginning to compute a new value.
439
        :param timeout: An optional timeout to wait for the request to complete (in seconds). If the
440
          request has not completed before the timeout has elapsed, ExecutionTimeoutError is raised.
441
        :returns: An ExecutionRequest for the given products and subjects.
442
        """
443
        native_execution_request = PyExecutionRequest(
7✔
444
            poll=poll,
445
            poll_delay_in_ms=int(poll_delay * 1000) if poll_delay else None,
446
            timeout_in_ms=int(timeout * 1000) if timeout else None,
447
        )
448
        for product, subject in requests:
7✔
449
            self._scheduler.execution_add_root_select(native_execution_request, subject, product)
7✔
450
        return ExecutionRequest(tuple(requests), native_execution_request)
7✔
451

452
    def invalidate_files(self, direct_filenames: Iterable[str]) -> int:
7✔
453
        """Invalidates the given filenames in an internal product Graph instance."""
454
        invalidated = self._scheduler.invalidate_files(direct_filenames)
7✔
455
        self._maybe_visualize()
7✔
456
        return invalidated
7✔
457

458
    def invalidate_all_files(self) -> int:
7✔
459
        """Invalidates all filenames in an internal product Graph instance."""
460
        invalidated = self._scheduler.invalidate_all_files()
×
461
        self._maybe_visualize()
×
462
        return invalidated
×
463

464
    def metrics(self) -> dict[str, int]:
7✔
465
        """Returns metrics for this SchedulerSession as a dict of metric name to metric value."""
466
        return native_engine.scheduler_metrics(self.py_scheduler, self.py_session)
×
467

468
    def live_items(self) -> tuple[list[Any], dict[str, tuple[int, int]]]:
7✔
469
        """Return all Python objects held by the Scheduler."""
470
        return native_engine.scheduler_live_items(self.py_scheduler, self.py_session)
×
471

472
    def _maybe_visualize(self) -> None:
7✔
473
        if self._scheduler.visualize_to_dir is not None:
7✔
474
            # TODO: This increment-and-get is racey.
475
            name = f"graph.{self._scheduler._visualize_run_count:03d}.dot"
×
476
            self._scheduler._visualize_run_count += 1
×
477
            logger.info(f"Visualizing graph as {name}")
×
478
            self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name))
×
479

480
    def teardown_dynamic_ui(self) -> None:
7✔
481
        native_engine.teardown_dynamic_ui(self.py_scheduler, self.py_session)
×
482

483
    def _execute(
7✔
484
        self, execution_request: ExecutionRequest
485
    ) -> tuple[tuple[tuple[Any, Return], ...], tuple[tuple[Any, Throw], ...]]:
486
        start_time = time.time()
7✔
487
        try:
7✔
488
            raw_roots = native_engine.scheduler_execute(
7✔
489
                self.py_scheduler,
490
                self.py_session,
491
                execution_request.native,
492
            )
493
        except native_engine.PollTimeout:
×
494
            raise ExecutionTimeoutError("Timed out")
×
495

496
        states = [
7✔
497
            (
498
                Throw(
499
                    raw_root.result,
500
                    python_traceback=raw_root.python_traceback,
501
                    engine_traceback=raw_root.engine_traceback,
502
                )
503
                if raw_root.is_throw
504
                else Return(raw_root.result)
505
            )
506
            for raw_root in raw_roots
507
        ]
508

509
        roots = list(zip(execution_request.roots, states))
7✔
510

511
        self._maybe_visualize()
7✔
512
        logger.debug(
7✔
513
            "computed %s nodes in %f seconds. there are %s total nodes.",
514
            len(roots),
515
            time.time() - start_time,
516
            self._scheduler.graph_len(),
517
        )
518

519
        returns = tuple((root, state) for root, state in roots if isinstance(state, Return))
7✔
520
        throws = tuple((root, state) for root, state in roots if isinstance(state, Throw))
7✔
521
        return returns, throws
7✔
522

523
    def _raise_on_error(self, throws: list[Throw]) -> NoReturn:
7✔
524
        exception_noun = pluralize(len(throws), "Exception")
7✔
525
        others_msg = f"\n(and {len(throws) - 1} more)\n" if len(throws) > 1 else ""
7✔
526
        raise ExecutionError(
7✔
527
            f"{exception_noun} encountered:\n\n"
528
            f"{throws[0].render(self._scheduler.include_trace_on_error)}\n"
529
            f"{others_msg}",
530
            wrapped_exceptions=tuple(t.exc for t in throws),
531
        )
532

533
    def execute(self, execution_request: ExecutionRequest) -> list[Any]:
7✔
534
        """Invoke the engine for the given ExecutionRequest, returning successful values or raising.
535

536
        :return: A sequence of per-request results.
537
        """
538
        returns, throws = self._execute(execution_request)
7✔
539

540
        # Throw handling.
541
        if throws:
7✔
542
            self._raise_on_error([t for _, t in throws])
7✔
543

544
        # Everything is a Return: we rely on the fact that roots are ordered to preserve subject
545
        # order in output lists.
546
        return [ret.value for _, ret in returns]
7✔
547

548
    def run_goal_rule(
7✔
549
        self,
550
        product: type[Goal],
551
        subject: Params,
552
        *,
553
        poll: bool = False,
554
        poll_delay: float | None = None,
555
    ) -> int:
556
        """
557
        :param product: A Goal subtype.
558
        :param subject: subject for the request.
559
        :param poll: See self.execution_request.
560
        :param poll_delay: See self.execution_request.
561
        :returns: An exit_code for the given Goal.
562
        """
563
        if self._scheduler.visualize_to_dir is not None:
7✔
564
            rule_graph_name = f"rule_graph.{product.name}.dot"
×
565
            params = self._scheduler._to_params_list(subject)
×
566
            self._scheduler.visualize_rule_subgraph_to_file(
×
567
                os.path.join(self._scheduler.visualize_to_dir, rule_graph_name),
568
                [type(p) for p in params],
569
                product,
570
            )
571
        with self._goals._execute(product):
7✔
572
            (return_value,) = self.product_request(
7✔
573
                product, [subject], poll=poll, poll_delay=poll_delay
574
            )
575
        return cast(int, return_value.exit_code)
7✔
576

577
    def product_request(
7✔
578
        self,
579
        product: type,
580
        subjects: Sequence[Any | Params],
581
        *,
582
        poll: bool = False,
583
        poll_delay: float | None = None,
584
        timeout: float | None = None,
585
    ) -> list:
586
        """Executes a request for a single product for some subjects, and returns the products.
587

588
        :param product: A product type for the request.
589
        :param subjects: A list of subjects or Params instances for the request.
590
        :param poll: See self.execution_request.
591
        :param poll_delay: See self.execution_request.
592
        :param timeout: See self.execution_request.
593
        :returns: A list of the requested products, with length match len(subjects).
594
        """
595
        request = self.execution_request(
7✔
596
            [(product, subject) for subject in subjects],
597
            poll=poll,
598
            poll_delay=poll_delay,
599
            timeout=timeout,
600
        )
601
        return self.execute(request)
7✔
602

603
    def capture_snapshots(self, path_globs_and_roots: Iterable[PathGlobsAndRoot]) -> list[Snapshot]:
7✔
604
        """Synchronously captures Snapshots for each matching PathGlobs rooted at a its root
605
        directory.
606

607
        This is a blocking operation, and should be avoided where possible.
608
        """
609
        return native_engine.capture_snapshots(
×
610
            self.py_scheduler,
611
            self.py_session,
612
            _PathGlobsAndRootCollection(path_globs_and_roots),
613
        )
614

615
    def single_file_digests_to_bytes(self, digests: Sequence[FileDigest]) -> list[bytes]:
7✔
616
        return native_engine.single_file_digests_to_bytes(self.py_scheduler, list(digests))
×
617

618
    def snapshots_to_file_contents(
7✔
619
        self, snapshots: Sequence[Snapshot]
620
    ) -> tuple[DigestContents, ...]:
621
        """For each input `Snapshot`, yield a single `DigestContents` containing all the
622
        `FileContent`s corresponding to the file(s) contained within that `Snapshot`.
623

624
        Note that we cannot currently use a parallelized version of `self.product_request` since
625
        each snapshot needs to yield a separate `DigestContents`.
626
        """
627
        return tuple(
×
628
            self.product_request(DigestContents, [snapshot.digest])[0] for snapshot in snapshots
629
        )
630

631
    def ensure_remote_has_recursive(self, digests: Sequence[Digest | FileDigest]) -> None:
7✔
632
        native_engine.ensure_remote_has_recursive(self.py_scheduler, list(digests))
×
633

634
    def ensure_directory_digest_persisted(self, digest: Digest) -> None:
7✔
635
        native_engine.ensure_directory_digest_persisted(self.py_scheduler, digest)
×
636

637
    def write_digest(
7✔
638
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
639
    ) -> None:
640
        """Write a digest to disk, relative to the build root."""
641
        if path_prefix and PurePath(path_prefix).is_absolute():
2✔
642
            raise ValueError(
×
643
                f"The `path_prefix` {path_prefix} must be a relative path, as the engine writes "
644
                "the digest relative to the build root."
645
            )
646
        native_engine.write_digest(
2✔
647
            self.py_scheduler, self.py_session, digest, path_prefix or "", clear_paths
648
        )
649

650
    def lease_files_in_graph(self) -> None:
7✔
651
        native_engine.lease_files_in_graph(self.py_scheduler, self.py_session)
1✔
652

653
    def garbage_collect_store(self, target_size_bytes: int) -> None:
7✔
654
        self._scheduler.garbage_collect_store(target_size_bytes)
1✔
655

656
    def get_metrics(self) -> dict[str, int]:
7✔
657
        return native_engine.session_get_metrics(self.py_session)
×
658

659
    def get_observation_histograms(self) -> dict[str, Any]:
7✔
660
        return native_engine.session_get_observation_histograms(self.py_scheduler, self.py_session)
×
661

662
    def record_test_observation(self, value: int) -> None:
7✔
663
        native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value)
×
664

665
    @property
7✔
666
    def is_cancelled(self) -> bool:
7✔
667
        return self.py_session.is_cancelled()
×
668

669
    def cancel(self) -> None:
7✔
670
        self.py_session.cancel()
1✔
671

672
    def wait_for_tail_tasks(self, timeout: float) -> None:
7✔
673
        native_engine.session_wait_for_tail_tasks(self.py_scheduler, self.py_session, timeout)
×
674

675

676
def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks:
7✔
677
    """Create a native Tasks object loaded with given RuleIndex."""
678
    tasks = PyTasks()
7✔
679

680
    # Compute a reverse index of union membership.
681
    member_type_to_base_types = defaultdict(list)
7✔
682
    for base_type, member_types in union_membership.items():
7✔
683
        for member_type in member_types:
7✔
684
            member_type_to_base_types[member_type].append(base_type)
7✔
685

686
    # Compute map from union base type to rules that have one of its member types as a param.
687
    # The value is a list of pairs (rule, member type in that rule's params).
688
    base_type_to_member_rule_type_pairs: dict[type, list[tuple[TaskRule, type[Any]]]] = defaultdict(
7✔
689
        list
690
    )
691
    rule_id_to_rule: dict[str, TaskRule] = {}
7✔
692
    for task_rule in rule_index.rules:
7✔
693
        rule_id_to_rule[task_rule.canonical_name] = task_rule
7✔
694
        for param_type in task_rule.parameters.values():
7✔
695
            for base_type in member_type_to_base_types.get(param_type, tuple()):
7✔
696
                base_type_to_member_rule_type_pairs[base_type].append((task_rule, param_type))
7✔
697

698
    def register_task(rule: TaskRule) -> None:
7✔
699
        native_engine.tasks_task_begin(
7✔
700
            tasks,
701
            rule.func,
702
            rule.output_type,
703
            tuple(rule.parameters.items()),
704
            rule.masked_types,
705
            side_effecting=any(issubclass(t, SideEffecting) for t in rule.parameters.values()),
706
            engine_aware_return_type=issubclass(rule.output_type, EngineAwareReturnType),
707
            cacheable=rule.cacheable,
708
            name=rule.canonical_name,
709
            desc=rule.desc or "",
710
            level=rule.level.level,
711
        )
712

713
        for awaitable in rule.awaitables:
7✔
714
            unions = [t for t in awaitable.input_types if is_union(t)]
7✔
715
            if len(unions) == 1:
7✔
716
                union = unions[0]
7✔
717
                if awaitable.rule_id:
7✔
718
                    if rule_id_to_rule[awaitable.rule_id].polymorphic:
7✔
719
                        # This is a polymorphic call-by-name. Compute its vtable data, i.e., a list
720
                        # of pairs (union member type, id of implementation rule for that type).
721
                        member_rule_type_pairs = base_type_to_member_rule_type_pairs.get(union)
7✔
722
                        vtable_entries: list[tuple[type[Any], str]] = []
7✔
723
                        for member_rule, member_type in member_rule_type_pairs or []:
7✔
724
                            # If a rule has a union member as a param, and returns the relevant
725
                            # output type, then we take it to be the implementation of the
726
                            # polymorphic rule for the union member.
727
                            if member_rule.output_type == awaitable.output_type:
7✔
728
                                vtable_entries.append((member_type, member_rule.canonical_name))
7✔
729
                        in_scope_types = union_in_scope_types(union)
7✔
730
                        assert in_scope_types is not None
7✔
731
                        native_engine.tasks_add_call(
7✔
732
                            tasks,
733
                            awaitable.output_type,
734
                            awaitable.input_types,
735
                            awaitable.rule_id,
736
                            awaitable.explicit_args_arity,
737
                            vtable_entries,
738
                            in_scope_types,
739
                        )
740
                else:
741
                    # This is a union Get.
742
                    # Register the union by recording a copy of the Get for each union member.
743
                    in_scope_types = union_in_scope_types(union)
×
744
                    assert in_scope_types is not None
×
745
                    for union_member in union_membership.get(union):
×
746
                        native_engine.tasks_add_get_union(
×
747
                            tasks,
748
                            awaitable.output_type,
749
                            tuple(union_member if t == union else t for t in awaitable.input_types),
750
                            in_scope_types,
751
                        )
752
            elif len(unions) > 1:
7✔
753
                raise TypeError(
×
754
                    f"Only one @union may be used in a call or Get, but {awaitable} used: {unions}."
755
                )
756
            elif awaitable.rule_id is not None:
7✔
757
                # Is a non-polymorphic call to a known rule.
758
                native_engine.tasks_add_call(
7✔
759
                    tasks,
760
                    awaitable.output_type,
761
                    awaitable.input_types,
762
                    awaitable.rule_id,
763
                    awaitable.explicit_args_arity,
764
                    vtable_entries=None,
765
                    in_scope_types=None,
766
                )
767
            else:
768
                # Otherwise, the Get subject is a "concrete" type, so add a single Get edge.
769
                native_engine.tasks_add_get(tasks, awaitable.output_type, awaitable.input_types)
×
770

771
        native_engine.tasks_task_end(tasks)
7✔
772

773
    for task_rule in rule_index.rules:
7✔
774
        register_task(task_rule)
7✔
775
    for query in rule_index.queries:
7✔
776
        native_engine.tasks_add_query(
7✔
777
            tasks,
778
            query.output_type,
779
            query.input_types,
780
        )
781
    return tasks
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc