• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21803785359

08 Feb 2026 07:13PM UTC coverage: 43.3% (-37.0%) from 80.277%
21803785359

Pull #23085

github

web-flow
Merge 7c1cd926d into 40389cc58
Pull Request #23085: A helper method for indexing paths by source root

2 of 6 new or added lines in 1 file covered. (33.33%)

17114 existing lines in 539 files now uncovered.

26075 of 60219 relevant lines covered (43.3%)

0.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.19
/src/python/pants/engine/internals/scheduler.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import logging
1✔
7
import os
1✔
8
import time
1✔
9
from collections import defaultdict
1✔
10
from collections.abc import Iterable, Sequence
1✔
11
from dataclasses import dataclass
1✔
12
from pathlib import PurePath
1✔
13
from types import CoroutineType
1✔
14
from typing import Any, NoReturn, cast
1✔
15

16
from typing_extensions import TypedDict
1✔
17

18
from pants.engine.collection import Collection
1✔
19
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType, SideEffecting
1✔
20
from pants.engine.fs import (
1✔
21
    CreateDigest,
22
    Digest,
23
    DigestContents,
24
    DigestEntries,
25
    DigestSubset,
26
    Directory,
27
    FileContent,
28
    FileDigest,
29
    FileEntry,
30
    NativeDownloadFile,
31
    PathGlobs,
32
    PathGlobsAndRoot,
33
    PathMetadataRequest,
34
    PathMetadataResult,
35
    Paths,
36
    Snapshot,
37
    SymlinkEntry,
38
)
39
from pants.engine.goal import CurrentExecutingGoals, Goal
1✔
40
from pants.engine.internals import native_engine
1✔
41
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
1✔
42
from pants.engine.internals.native_dep_inference import (
1✔
43
    JavascriptDependencyCandidate,
44
    NativeDockerfileInfo,
45
    NativeJavascriptFileDependencies,
46
    NativePythonFileDependencies,
47
)
48
from pants.engine.internals.native_engine import (
1✔
49
    PyExecutionRequest,
50
    PyExecutionStrategyOptions,
51
    PyExecutor,
52
    PyLocalStoreOptions,
53
    PyRemotingOptions,
54
    PyScheduler,
55
    PySession,
56
    PySessionCancellationLatch,
57
    PyTasks,
58
    PyTypes,
59
)
60
from pants.engine.internals.nodes import Return, Throw
1✔
61
from pants.engine.internals.selectors import Params
1✔
62
from pants.engine.internals.session import RunId, SessionValues
1✔
63
from pants.engine.platform import Platform
1✔
64
from pants.engine.process import (
1✔
65
    FallibleProcessResult,
66
    InteractiveProcess,
67
    InteractiveProcessResult,
68
    Process,
69
    ProcessResultMetadata,
70
)
71
from pants.engine.rules import Rule, RuleIndex, TaskRule
1✔
72
from pants.engine.unions import UnionMembership, is_union, union_in_scope_types
1✔
73
from pants.option.bootstrap_options import (
1✔
74
    LOCAL_STORE_LEASE_TIME_SECS,
75
    ExecutionOptions,
76
    LocalStoreOptions,
77
)
78
from pants.util.contextutil import temporary_file_path
1✔
79
from pants.util.logging import LogLevel
1✔
80
from pants.util.strutil import pluralize
1✔
81

82
logger = logging.getLogger(__name__)
1✔
83

84

85
Workunit = dict[str, Any]
1✔
86

87

88
class PolledWorkunits(TypedDict):
1✔
89
    started: tuple[Workunit, ...]
1✔
90
    completed: tuple[Workunit, ...]
1✔
91

92

93
@dataclass(frozen=True)
1✔
94
class ExecutionRequest:
1✔
95
    """Holds the roots for an execution, which might have been requested by a user.
96

97
    To create an ExecutionRequest, see `SchedulerSession.execution_request`.
98
    """
99

100
    native: PyExecutionRequest
1✔
101

102

103
class ExecutionError(Exception):
1✔
104
    def __init__(self, message, wrapped_exceptions=None):
1✔
105
        super().__init__(message)
1✔
106
        self.wrapped_exceptions = wrapped_exceptions or ()
1✔
107

108

109
class ExecutionTimeoutError(ExecutionError):
1✔
110
    """An ExecutionRequest specified a timeout which elapsed before the request completed."""
111

112

113
class Scheduler:
1✔
114
    def __init__(
1✔
115
        self,
116
        *,
117
        ignore_patterns: list[str],
118
        use_gitignore: bool,
119
        build_root: str,
120
        pants_workdir: str,
121
        local_execution_root_dir: str,
122
        named_caches_dir: str,
123
        ca_certs_path: str | None,
124
        rules: Iterable[Rule],
125
        union_membership: UnionMembership,
126
        execution_options: ExecutionOptions,
127
        local_store_options: LocalStoreOptions,
128
        executor: PyExecutor,
129
        include_trace_on_error: bool = True,
130
        visualize_to_dir: str | None = None,
131
        validate_reachability: bool = True,
132
        watch_filesystem: bool = True,
133
    ) -> None:
134
        """
135
        :param ignore_patterns: A list of gitignore-style file patterns for pants to ignore.
136
        :param use_gitignore: If set, pay attention to .gitignore files.
137
        :param build_root: The build root as a string.
138
        :param pants_workdir: The pants workdir as a string.
139
        :param local_execution_root_dir: The directory to use for local execution sandboxes.
140
        :param named_caches_dir: The directory to use as the root for named mutable caches.
141
        :param ca_certs_path: Path to pem file for custom CA, if needed.
142
        :param rules: A set of Rules which is used to compute values in the graph.
143
        :param union_membership: All the registered and normalized union rules.
144
        :param execution_options: Execution options for (remote) processes.
145
        :param local_store_options: Options for the engine's LMDB store(s).
146
        :param include_trace_on_error: Include the trace through the graph upon encountering errors.
147
        :param validate_reachability: True to assert that all rules in an otherwise successfully
148
          constructed rule graph are reachable: if a graph cannot be successfully constructed, it
149
          is always a fatal error.
150
        :param watch_filesystem: False if filesystem watching should be disabled.
151
        """
152
        self.include_trace_on_error = include_trace_on_error
1✔
153
        self._visualize_to_dir = visualize_to_dir
1✔
154
        self._visualize_run_count = 0
1✔
155
        # Validate and register all provided and intrinsic tasks.
156
        rule_index = RuleIndex.create(rules)
1✔
157
        tasks = register_rules(rule_index, union_membership)
1✔
158

159
        # Create the native Scheduler and Session.
160
        types = PyTypes(
1✔
161
            paths=Paths,
162
            path_metadata_request=PathMetadataRequest,
163
            path_metadata_result=PathMetadataResult,
164
            file_content=FileContent,
165
            file_entry=FileEntry,
166
            symlink_entry=SymlinkEntry,
167
            directory=Directory,
168
            digest_contents=DigestContents,
169
            digest_entries=DigestEntries,
170
            path_globs=PathGlobs,
171
            create_digest=CreateDigest,
172
            digest_subset=DigestSubset,
173
            native_download_file=NativeDownloadFile,
174
            platform=Platform,
175
            process=Process,
176
            process_result=FallibleProcessResult,
177
            process_result_metadata=ProcessResultMetadata,
178
            coroutine=CoroutineType,
179
            session_values=SessionValues,
180
            run_id=RunId,
181
            interactive_process=InteractiveProcess,
182
            interactive_process_result=InteractiveProcessResult,
183
            engine_aware_parameter=EngineAwareParameter,
184
            docker_resolve_image_request=DockerResolveImageRequest,
185
            docker_resolve_image_result=DockerResolveImageResult,
186
            parsed_python_deps_result=NativePythonFileDependencies,
187
            parsed_javascript_deps_result=NativeJavascriptFileDependencies,
188
            parsed_dockerfile_info_result=NativeDockerfileInfo,
189
            parsed_javascript_deps_candidate_result=JavascriptDependencyCandidate,
190
        )
191
        remoting_options = PyRemotingOptions(
1✔
192
            provider=execution_options.remote_provider.value,
193
            execution_enable=execution_options.remote_execution,
194
            store_headers=execution_options.remote_store_headers,
195
            store_chunk_bytes=execution_options.remote_store_chunk_bytes,
196
            store_rpc_retries=execution_options.remote_store_rpc_retries,
197
            store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
198
            store_rpc_timeout_millis=execution_options.remote_store_rpc_timeout_millis,
199
            store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
200
            store_batch_load_enabled=execution_options.remote_store_batch_load_enabled,
201
            cache_warnings_behavior=execution_options.remote_cache_warnings.value,
202
            cache_content_behavior=execution_options.cache_content_behavior.value,
203
            cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
204
            cache_rpc_timeout_millis=execution_options.remote_cache_rpc_timeout_millis,
205
            execution_headers=execution_options.remote_execution_headers,
206
            execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs,
207
            execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency,
208
            store_address=execution_options.remote_store_address,
209
            execution_address=execution_options.remote_execution_address,
210
            execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
211
            instance_name=execution_options.remote_instance_name,
212
            root_ca_certs_path=execution_options.remote_ca_certs_path,
213
            client_certs_path=execution_options.remote_client_certs_path,
214
            client_key_path=execution_options.remote_client_key_path,
215
            append_only_caches_base_path=execution_options.remote_execution_append_only_caches_base_path,
216
        )
217
        py_local_store_options = PyLocalStoreOptions(
1✔
218
            store_dir=local_store_options.store_dir,
219
            process_cache_max_size_bytes=local_store_options.processes_max_size_bytes,
220
            files_max_size_bytes=local_store_options.files_max_size_bytes,
221
            directories_max_size_bytes=local_store_options.directories_max_size_bytes,
222
            lease_time_millis=LOCAL_STORE_LEASE_TIME_SECS * 1000,
223
            shard_count=local_store_options.shard_count,
224
        )
225
        exec_strategy_opts = PyExecutionStrategyOptions(
1✔
226
            local_cache=execution_options.local_cache,
227
            remote_cache_read=execution_options.remote_cache_read,
228
            remote_cache_write=execution_options.remote_cache_write,
229
            use_sandboxer=execution_options.use_sandboxer,
230
            local_parallelism=execution_options.process_execution_local_parallelism,
231
            local_enable_nailgun=execution_options.process_execution_local_enable_nailgun,
232
            remote_parallelism=execution_options.process_execution_remote_parallelism,
233
            child_max_memory=execution_options.process_total_child_memory_usage or 0,
234
            child_default_memory=execution_options.process_per_child_memory_usage,
235
            graceful_shutdown_timeout=execution_options.process_execution_graceful_shutdown_timeout,
236
        )
237

238
        self._py_executor = executor
1✔
239
        self._py_scheduler = native_engine.scheduler_create(
1✔
240
            executor,
241
            tasks,
242
            types,
243
            build_root,
244
            pants_workdir,
245
            local_execution_root_dir,
246
            named_caches_dir,
247
            ignore_patterns,
248
            use_gitignore,
249
            watch_filesystem,
250
            remoting_options,
251
            py_local_store_options,
252
            exec_strategy_opts,
253
            ca_certs_path,
254
        )
255

256
        # If configured, visualize the rule graph before asserting that it is valid.
257
        if self._visualize_to_dir is not None:
1✔
258
            rule_graph_name = "rule_graph.dot"
×
259
            self.visualize_rule_graph_to_file(os.path.join(self._visualize_to_dir, rule_graph_name))
×
260

261
        if validate_reachability:
1✔
262
            native_engine.validate_reachability(self.py_scheduler)
1✔
263

264
    @property
1✔
265
    def py_scheduler(self) -> PyScheduler:
1✔
266
        return self._py_scheduler
1✔
267

268
    @property
1✔
269
    def py_executor(self) -> PyExecutor:
1✔
270
        return self._py_executor
×
271

272
    def _to_params_list(self, subject_or_params: Any | Params) -> Sequence[Any]:
1✔
273
        if isinstance(subject_or_params, Params):
1✔
274
            return subject_or_params.params
1✔
UNCOV
275
        return [subject_or_params]
×
276

277
    def visualize_rule_graph_to_file(self, filename: str) -> None:
1✔
UNCOV
278
        native_engine.rule_graph_visualize(self.py_scheduler, filename)
×
279

280
    def visualize_rule_subgraph_to_file(
1✔
281
        self, filename: str, root_subject_types: list[type], product_type: type
282
    ) -> None:
UNCOV
283
        native_engine.rule_subgraph_visualize(
×
284
            self.py_scheduler, root_subject_types, product_type, filename
285
        )
286

287
    def rule_graph_visualization(self):
1✔
UNCOV
288
        with temporary_file_path() as path:
×
UNCOV
289
            self.visualize_rule_graph_to_file(path)
×
UNCOV
290
            with open(path) as fd:
×
UNCOV
291
                for line in fd.readlines():
×
UNCOV
292
                    yield line.rstrip()
×
293

294
    def rule_subgraph_visualization(self, root_subject_types: list[type], product_type: type):
1✔
UNCOV
295
        with temporary_file_path() as path:
×
UNCOV
296
            self.visualize_rule_subgraph_to_file(path, root_subject_types, product_type)
×
UNCOV
297
            with open(path) as fd:
×
UNCOV
298
                for line in fd.readlines():
×
UNCOV
299
                    yield line.rstrip()
×
300

301
    def rule_graph_consumed_types(
1✔
302
        self, root_subject_types: Sequence[type], product_type: type
303
    ) -> Sequence[type]:
UNCOV
304
        return native_engine.rule_graph_consumed_types(
×
305
            self.py_scheduler, root_subject_types, product_type
306
        )
307

308
    def invalidate_files(self, filenames: Iterable[str]) -> int:
1✔
309
        return native_engine.graph_invalidate_paths(self.py_scheduler, filenames)
1✔
310

311
    def invalidate_all_files(self) -> int:
1✔
312
        return native_engine.graph_invalidate_all_paths(self.py_scheduler)
×
313

314
    def invalidate_all(self) -> None:
1✔
UNCOV
315
        native_engine.graph_invalidate_all(self.py_scheduler)
×
316

317
    def check_invalidation_watcher_liveness(self) -> None:
1✔
318
        native_engine.check_invalidation_watcher_liveness(self.py_scheduler)
×
319

320
    def graph_len(self) -> int:
1✔
321
        return native_engine.graph_len(self.py_scheduler)
1✔
322

323
    def execution_add_root_select(
1✔
324
        self, execution_request: PyExecutionRequest, subject_or_params: Any | Params, product: type
325
    ) -> None:
326
        params = self._to_params_list(subject_or_params)
1✔
327
        native_engine.execution_add_root_select(
1✔
328
            self.py_scheduler, execution_request, params, product
329
        )
330

331
    @property
1✔
332
    def visualize_to_dir(self) -> str | None:
1✔
333
        return self._visualize_to_dir
1✔
334

335
    def garbage_collect_store(self, target_size_bytes: int) -> None:
1✔
UNCOV
336
        native_engine.garbage_collect_store(self.py_scheduler, target_size_bytes)
×
337

338
    def new_session(
1✔
339
        self,
340
        build_id: str,
341
        dynamic_ui: bool = False,
342
        ui_use_prodash: bool = False,
343
        max_workunit_level: LogLevel = LogLevel.DEBUG,
344
        session_values: SessionValues | None = None,
345
        cancellation_latch: PySessionCancellationLatch | None = None,
346
    ) -> SchedulerSession:
347
        """Creates a new SchedulerSession for this Scheduler."""
348
        return SchedulerSession(
1✔
349
            self,
350
            PySession(
351
                scheduler=self.py_scheduler,
352
                dynamic_ui=dynamic_ui,
353
                ui_use_prodash=ui_use_prodash,
354
                max_workunit_level=max_workunit_level.level,
355
                build_id=build_id,
356
                session_values=session_values or SessionValues(),
357
                cancellation_latch=cancellation_latch or PySessionCancellationLatch(),
358
            ),
359
        )
360

361
    def shutdown(self, timeout_secs: int = 60) -> None:
1✔
362
        native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs)
×
363

364

365
class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]):
1✔
366
    pass
1✔
367

368

369
class SchedulerSession:
1✔
370
    """A handle to a shared underlying Scheduler and a unique Session.
371

372
    Generally a Session corresponds to a single run of pants: some metrics are specific to a
373
    Session.
374
    """
375

376
    def __init__(self, scheduler: Scheduler, session: PySession) -> None:
1✔
377
        self._scheduler = scheduler
1✔
378
        self._py_session = session
1✔
379
        self._goals = session.session_values.get(CurrentExecutingGoals) or CurrentExecutingGoals()
1✔
380

381
    @property
1✔
382
    def scheduler(self) -> Scheduler:
1✔
383
        return self._scheduler
1✔
384

385
    @property
1✔
386
    def py_scheduler(self) -> PyScheduler:
1✔
387
        return self._scheduler.py_scheduler
1✔
388

389
    @property
1✔
390
    def py_session(self) -> PySession:
1✔
391
        return self._py_session
1✔
392

393
    def isolated_shallow_clone(self, build_id: str) -> SchedulerSession:
1✔
UNCOV
394
        return SchedulerSession(
×
395
            self._scheduler,
396
            native_engine.session_isolated_shallow_clone(self._py_session, build_id),
397
        )
398

399
    def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
1✔
UNCOV
400
        result = native_engine.session_poll_workunits(
×
401
            self.py_scheduler, self.py_session, max_log_verbosity.level
402
        )
UNCOV
403
        return {"started": result[0], "completed": result[1]}
×
404

405
    def new_run_id(self) -> None:
1✔
406
        """Assigns a new "run id" to this Session, without creating a new Session.
407

408
        Usually each Session corresponds to one end user "run", but there are exceptions: notably,
409
        the `--loop` feature uses one Session, but would like to observe new values for uncacheable
410
        nodes in each iteration of its loop.
411
        """
UNCOV
412
        native_engine.session_new_run_id(self.py_session)
×
413

414
    def visualize_graph_to_file(self, filename: str) -> None:
1✔
415
        """Visualize a graph walk by writing graphviz `dot` output to a file."""
416
        native_engine.graph_visualize(self.py_scheduler, self.py_session, filename)
×
417

418
    def visualize_rule_graph_to_file(self, filename: str) -> None:
1✔
419
        self._scheduler.visualize_rule_graph_to_file(filename)
×
420

421
    def execution_request(
1✔
422
        self,
423
        requests: Sequence[tuple[type, Any | Params]],
424
        poll: bool = False,
425
        poll_delay: float | None = None,
426
        timeout: float | None = None,
427
    ) -> ExecutionRequest:
428
        """Create and return an ExecutionRequest for the given (product, subject) pairs.
429

430
        :param requests: A sequence of product types to request for subjects.
431
        :param poll: True to wait for _all_ of the given roots to
432
          have changed since their last observed values in this SchedulerSession.
433
        :param poll_delay: A delay (in seconds) to wait after observing a change, and before
434
          beginning to compute a new value.
435
        :param timeout: An optional timeout to wait for the request to complete (in seconds). If the
436
          request has not completed before the timeout has elapsed, ExecutionTimeoutError is raised.
437
        :returns: An ExecutionRequest for the given products and subjects.
438
        """
439
        native_execution_request = PyExecutionRequest(
1✔
440
            poll=poll,
441
            poll_delay_in_ms=int(poll_delay * 1000) if poll_delay else None,
442
            timeout_in_ms=int(timeout * 1000) if timeout else None,
443
        )
444
        for product, subject in requests:
1✔
445
            self._scheduler.execution_add_root_select(native_execution_request, subject, product)
1✔
446
        return ExecutionRequest(native_execution_request)
1✔
447

448
    def invalidate_files(self, direct_filenames: Iterable[str]) -> int:
1✔
449
        """Invalidates the given filenames in an internal product Graph instance."""
450
        invalidated = self._scheduler.invalidate_files(direct_filenames)
1✔
451
        self._maybe_visualize()
1✔
452
        return invalidated
1✔
453

454
    def invalidate_all_files(self) -> int:
1✔
455
        """Invalidates all filenames in an internal product Graph instance."""
456
        invalidated = self._scheduler.invalidate_all_files()
×
457
        self._maybe_visualize()
×
458
        return invalidated
×
459

460
    def metrics(self) -> dict[str, int]:
1✔
461
        """Returns metrics for this SchedulerSession as a dict of metric name to metric value."""
462
        return native_engine.scheduler_metrics(self.py_scheduler, self.py_session)
×
463

464
    def live_items(self) -> tuple[list[Any], dict[str, tuple[int, int]]]:
1✔
465
        """Return all Python objects held by the Scheduler."""
466
        return native_engine.scheduler_live_items(self.py_scheduler, self.py_session)
×
467

468
    def _maybe_visualize(self) -> None:
1✔
469
        if self._scheduler.visualize_to_dir is not None:
1✔
470
            # TODO: This increment-and-get is racey.
471
            name = f"graph.{self._scheduler._visualize_run_count:03d}.dot"
×
472
            self._scheduler._visualize_run_count += 1
×
473
            logger.info(f"Visualizing graph as {name}")
×
474
            self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name))
×
475

476
    def teardown_dynamic_ui(self) -> None:
1✔
477
        native_engine.teardown_dynamic_ui(self.py_scheduler, self.py_session)
×
478

479
    def _execute(
1✔
480
        self, execution_request: ExecutionRequest
481
    ) -> tuple[tuple[Return, ...], tuple[Throw, ...]]:
482
        start_time = time.time()
1✔
483
        try:
1✔
484
            raw_roots = native_engine.scheduler_execute(
1✔
485
                self.py_scheduler,
486
                self.py_session,
487
                execution_request.native,
488
            )
489
        except native_engine.PollTimeout:
×
490
            raise ExecutionTimeoutError("Timed out")
×
491

492
        states = [
1✔
493
            (
494
                Throw(
495
                    raw_root.result,
496
                    python_traceback=raw_root.python_traceback,
497
                    engine_traceback=raw_root.engine_traceback,
498
                )
499
                if raw_root.is_throw
500
                else Return(raw_root.result)
501
            )
502
            for raw_root in raw_roots
503
        ]
504

505
        self._maybe_visualize()
1✔
506
        logger.debug(
1✔
507
            "computed %s nodes in %f seconds. there are %s total nodes.",
508
            len(states),
509
            time.time() - start_time,
510
            self._scheduler.graph_len(),
511
        )
512

513
        returns = tuple(state for state in states if isinstance(state, Return))
1✔
514
        throws = tuple(state for state in states if isinstance(state, Throw))
1✔
515
        return returns, throws
1✔
516

517
    def _raise_on_error(self, throws: Sequence[Throw]) -> NoReturn:
1✔
518
        exception_noun = pluralize(len(throws), "Exception")
1✔
519
        others_msg = f"\n(and {len(throws) - 1} more)\n" if len(throws) > 1 else ""
1✔
520
        raise ExecutionError(
1✔
521
            f"{exception_noun} encountered:\n\n"
522
            f"{throws[0].render(self._scheduler.include_trace_on_error)}\n"
523
            f"{others_msg}",
524
            wrapped_exceptions=tuple(t.exc for t in throws),
525
        )
526

527
    def execute(self, execution_request: ExecutionRequest) -> list[Any]:
1✔
528
        """Invoke the engine for the given ExecutionRequest, returning successful values or raising.
529

530
        :return: A sequence of per-request results.
531
        """
532
        returns, throws = self._execute(execution_request)
1✔
533

534
        # Throw handling.
535
        if throws:
1✔
536
            self._raise_on_error(throws)
1✔
537

538
        # Everything is a Return: we rely on the fact that roots are ordered to preserve subject
539
        # order in output lists.
540
        return [ret.value for ret in returns]
1✔
541

542
    def run_goal_rule(
1✔
543
        self,
544
        product: type[Goal],
545
        subject: Params,
546
        *,
547
        poll: bool = False,
548
        poll_delay: float | None = None,
549
    ) -> int:
550
        """
551
        :param product: A Goal subtype.
552
        :param subject: subject for the request.
553
        :param poll: See self.execution_request.
554
        :param poll_delay: See self.execution_request.
555
        :returns: An exit_code for the given Goal.
556
        """
557
        if self._scheduler.visualize_to_dir is not None:
1✔
558
            rule_graph_name = f"rule_graph.{product.name}.dot"
×
559
            params = self._scheduler._to_params_list(subject)
×
560
            self._scheduler.visualize_rule_subgraph_to_file(
×
561
                os.path.join(self._scheduler.visualize_to_dir, rule_graph_name),
562
                [type(p) for p in params],
563
                product,
564
            )
565
        with self._goals._execute(product):
1✔
566
            (return_value,) = self.product_request(
1✔
567
                product, subject, poll=poll, poll_delay=poll_delay
568
            )
569
        return cast(int, return_value.exit_code)
1✔
570

571
    def product_request(
1✔
572
        self,
573
        product: type,
574
        subject: Any | Params,
575
        *,
576
        poll: bool = False,
577
        poll_delay: float | None = None,
578
        timeout: float | None = None,
579
    ) -> list:
580
        """Executes a request for a single product for a subject, and returns the products.
581

582
        :param product: A product type for the request.
583
        :param subject: A subject or Params instance for the request.
584
        :param poll: See self.execution_request.
585
        :param poll_delay: See self.execution_request.
586
        :param timeout: See self.execution_request.
587
        :returns: A list of the requested products, with length match len(subjects).
588
        """
589
        request = self.execution_request(
1✔
590
            [(product, subject)],
591
            poll=poll,
592
            poll_delay=poll_delay,
593
            timeout=timeout,
594
        )
595
        return self.execute(request)
1✔
596

597
    def capture_snapshots(self, path_globs_and_roots: Iterable[PathGlobsAndRoot]) -> list[Snapshot]:
1✔
598
        """Synchronously captures Snapshots for each matching PathGlobs rooted at a its root
599
        directory.
600

601
        This is a blocking operation, and should be avoided where possible.
602
        """
UNCOV
603
        return native_engine.capture_snapshots(
×
604
            self.py_scheduler,
605
            self.py_session,
606
            _PathGlobsAndRootCollection(path_globs_and_roots),
607
        )
608

609
    def single_file_digests_to_bytes(self, digests: Sequence[FileDigest]) -> list[bytes]:
1✔
UNCOV
610
        return native_engine.single_file_digests_to_bytes(self.py_scheduler, list(digests))
×
611

612
    def snapshots_to_file_contents(
1✔
613
        self, snapshots: Sequence[Snapshot]
614
    ) -> tuple[DigestContents, ...]:
615
        """For each input `Snapshot`, yield a single `DigestContents` containing all the
616
        `FileContent`s corresponding to the file(s) contained within that `Snapshot`.
617

618
        Note that we cannot currently use a parallelized version of `self.product_request` since
619
        each snapshot needs to yield a separate `DigestContents`.
620
        """
UNCOV
621
        return tuple(
×
622
            self.product_request(DigestContents, snapshot.digest)[0] for snapshot in snapshots
623
        )
624

625
    def ensure_remote_has_recursive(self, digests: Sequence[Digest | FileDigest]) -> None:
1✔
UNCOV
626
        native_engine.ensure_remote_has_recursive(self.py_scheduler, list(digests))
×
627

628
    def ensure_directory_digest_persisted(self, digest: Digest) -> None:
1✔
UNCOV
629
        native_engine.ensure_directory_digest_persisted(self.py_scheduler, digest)
×
630

631
    def write_digest(
1✔
632
        self, digest: Digest, *, path_prefix: str | None = None, clear_paths: Sequence[str] = ()
633
    ) -> None:
634
        """Write a digest to disk, relative to the build root."""
UNCOV
635
        if path_prefix and PurePath(path_prefix).is_absolute():
×
636
            raise ValueError(
×
637
                f"The `path_prefix` {path_prefix} must be a relative path, as the engine writes "
638
                "the digest relative to the build root."
639
            )
UNCOV
640
        native_engine.write_digest(
×
641
            self.py_scheduler, self.py_session, digest, path_prefix or "", clear_paths
642
        )
643

644
    def lease_files_in_graph(self) -> None:
1✔
UNCOV
645
        native_engine.lease_files_in_graph(self.py_scheduler, self.py_session)
×
646

647
    def garbage_collect_store(self, target_size_bytes: int) -> None:
1✔
UNCOV
648
        self._scheduler.garbage_collect_store(target_size_bytes)
×
649

650
    def get_metrics(self) -> dict[str, int]:
1✔
UNCOV
651
        return native_engine.session_get_metrics(self.py_session)
×
652

653
    def get_observation_histograms(self) -> dict[str, Any]:
1✔
UNCOV
654
        return native_engine.session_get_observation_histograms(self.py_scheduler, self.py_session)
×
655

656
    def record_test_observation(self, value: int) -> None:
1✔
UNCOV
657
        native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value)
×
658

659
    @property
1✔
660
    def is_cancelled(self) -> bool:
1✔
661
        return self.py_session.is_cancelled()
×
662

663
    def cancel(self) -> None:
1✔
UNCOV
664
        self.py_session.cancel()
×
665

666
    def wait_for_tail_tasks(self, timeout: float) -> None:
1✔
667
        native_engine.session_wait_for_tail_tasks(self.py_scheduler, self.py_session, timeout)
×
668

669

670
def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks:
1✔
671
    """Create a native Tasks object loaded with given RuleIndex."""
672
    tasks = PyTasks()
1✔
673

674
    # Compute a reverse index of union membership.
675
    member_type_to_base_types = defaultdict(list)
1✔
676
    for base_type, member_types in union_membership.items():
1✔
677
        for member_type in member_types:
1✔
678
            member_type_to_base_types[member_type].append(base_type)
1✔
679

680
    # Compute map from union base type to rules that have one of its member types as a param.
681
    # The value is a list of pairs (rule, member type in that rule's params).
682
    base_type_to_member_rule_type_pairs: dict[type, list[tuple[TaskRule, type[Any]]]] = defaultdict(
1✔
683
        list
684
    )
685
    rule_id_to_rule: dict[str, TaskRule] = {}
1✔
686
    for task_rule in rule_index.rules:
1✔
687
        rule_id_to_rule[task_rule.canonical_name] = task_rule
1✔
688
        for param_type in task_rule.parameters.values():
1✔
689
            for base_type in member_type_to_base_types.get(param_type, tuple()):
1✔
690
                base_type_to_member_rule_type_pairs[base_type].append((task_rule, param_type))
1✔
691

692
    def register_task(rule: TaskRule) -> None:
1✔
693
        native_engine.tasks_task_begin(
1✔
694
            tasks,
695
            rule.func,
696
            rule.output_type,
697
            tuple(rule.parameters.items()),
698
            rule.masked_types,
699
            side_effecting=any(issubclass(t, SideEffecting) for t in rule.parameters.values()),
700
            engine_aware_return_type=issubclass(rule.output_type, EngineAwareReturnType),
701
            cacheable=rule.cacheable,
702
            name=rule.canonical_name,
703
            desc=rule.desc or "",
704
            level=rule.level.level,
705
        )
706

707
        for awaitable in rule.awaitables:
1✔
708
            unions = [t for t in awaitable.input_types if is_union(t)]
1✔
709
            if len(unions) == 1:
1✔
710
                union = unions[0]
1✔
711
                if rule_id_to_rule[awaitable.rule_id].polymorphic:
1✔
712
                    # This is a polymorphic call-by-name. Compute its vtable data, i.e., a list
713
                    # of pairs (union member type, id of implementation rule for that type).
714
                    member_rule_type_pairs = base_type_to_member_rule_type_pairs.get(union)
1✔
715
                    vtable_entries: list[tuple[type[Any], str]] = []
1✔
716
                    for member_rule, member_type in member_rule_type_pairs or []:
1✔
717
                        # If a rule has a union member as a param, and returns the relevant
718
                        # output type, then we take it to be the implementation of the
719
                        # polymorphic rule for the union member.
720
                        if member_rule.output_type == awaitable.output_type:
1✔
721
                            vtable_entries.append((member_type, member_rule.canonical_name))
1✔
722
                    in_scope_types = union_in_scope_types(union)
1✔
723
                    assert in_scope_types is not None
1✔
724
                    native_engine.tasks_add_call(
1✔
725
                        tasks,
726
                        awaitable.output_type,
727
                        awaitable.input_types,
728
                        awaitable.rule_id,
729
                        awaitable.explicit_args_arity,
730
                        vtable_entries,
731
                        in_scope_types,
732
                    )
733
            elif len(unions) > 1:
1✔
734
                raise TypeError(
×
735
                    f"Only one @union may be used in a call, but {awaitable} used: {unions}."
736
                )
737
            else:
738
                # Is a non-polymorphic call to a known rule.
739
                native_engine.tasks_add_call(
1✔
740
                    tasks,
741
                    awaitable.output_type,
742
                    awaitable.input_types,
743
                    awaitable.rule_id,
744
                    awaitable.explicit_args_arity,
745
                    vtable_entries=None,
746
                    in_scope_types=None,
747
                )
748

749
        native_engine.tasks_task_end(tasks)
1✔
750

751
    for task_rule in rule_index.rules:
1✔
752
        register_task(task_rule)
1✔
753
    for query in rule_index.queries:
1✔
754
        native_engine.tasks_add_query(
1✔
755
            tasks,
756
            query.output_type,
757
            query.input_types,
758
        )
759
    return tasks
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc