• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 24411696792

14 Apr 2026 04:49PM UTC coverage: 92.91% (-0.004%) from 92.914%
24411696792

push

github

web-flow
fix: preserve workunit log level in local cache metadata update (#23251)

## Problem

When a process runs (e.g. `generate-lockfiles`), the local cache runner
updates the "Scheduling:" workunit metadata to attach cache digests. As
a side effect, it **overrides the workunit log level to `Info`**,
regardless of its original level. Since `ExecuteProcess` workunits
intentionally start at `Debug` (to avoid rendering until execution
actually begins), this promotion causes duplicate INFO-level output:

```
23:22:27.30 [INFO] Starting: Generate lockfile for pytest
23:22:53.66 [INFO] Completed: Generate lockfile for pytest (26.4s)
23:22:53.66 [INFO] Completed: Scheduling: Generate lockfile for pytest (26.4s)
```

The third line is redundant — the scheduling workunit is a wrapper whose
duration mirrors its child.

## Root Cause

This was inadvertently introduced in #22212, which added Action/Command
digest metadata to workunits. The `update_metadata` call needed to
attach `local_command` and `local_action` digests, but the closure also
hardcoded `Level::Info` as the new level — discarding the original. The
PR was focused on metadata, not log levels, and the level override was
not discussed in review.

## Solution

Preserve the workunit's original log level when updating cache metadata,
instead of unconditionally overriding it to `Level::Info`. The metadata
update only needs to set `local_command` and `local_action` — it has no
reason to change the log level.

## Result

Default (INFO-level) output is now clean:

```
23:22:27.30 [INFO] Starting: Generate lockfile for pytest
23:22:53.66 [INFO] Completed: Generate lockfile for pytest
```

The "Scheduling:" message remains visible at `-ldebug` for debugging
purposes.

## LLM Disclosure

This PR was written with Amp (Claude). The LLM diagnosed the root cause
by tracing the workunit level flow through the codebase, identified the
accidental `Level::Info` override in #22212, and wrote the one-line fix.
I reviewed the d... (continued)

91623 of 98615 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.46
/src/python/pants/engine/internals/engine_test.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
import itertools
1✔
5
import os
1✔
6
import time
1✔
7
from dataclasses import dataclass
1✔
8
from pathlib import Path
1✔
9
from textwrap import dedent
1✔
10

11
import pytest
1✔
12

13
from pants.backend.python.target_types import PythonSourcesGeneratorTarget
1✔
14
from pants.base.exceptions import IntrinsicError
1✔
15
from pants.base.specs import Specs
1✔
16
from pants.base.specs_parser import SpecsParser
1✔
17
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
1✔
18
from pants.engine.fs import (
1✔
19
    EMPTY_FILE_DIGEST,
20
    EMPTY_SNAPSHOT,
21
    CreateDigest,
22
    Digest,
23
    DigestContents,
24
    FileContent,
25
    MergeDigests,
26
    Snapshot,
27
)
28
from pants.engine.internals.engine_testutil import (
1✔
29
    WorkunitTracker,
30
    assert_equal_with_printing,
31
    remove_locations_from_traceback,
32
)
33
from pants.engine.internals.native_engine import PyExecutor, UnionMembership
1✔
34
from pants.engine.internals.scheduler import ExecutionError, Scheduler, SchedulerSession
1✔
35
from pants.engine.internals.selectors import concurrently
1✔
36
from pants.engine.intrinsics import create_digest, merge_digests
1✔
37
from pants.engine.process import Process, ProcessCacheScope, ProcessResult
1✔
38
from pants.engine.rules import implicitly, rule
1✔
39
from pants.engine.streaming_workunit_handler import (
1✔
40
    StreamingWorkunitContext,
41
    StreamingWorkunitHandler,
42
    TargetInfo,
43
    WorkunitsCallback,
44
)
45
from pants.engine.unions import UnionRule, union
1✔
46
from pants.goal.run_tracker import RunTracker
1✔
47
from pants.option.bootstrap_options import DEFAULT_EXECUTION_OPTIONS, DEFAULT_LOCAL_STORE_OPTIONS
1✔
48
from pants.testutil.option_util import create_options_bootstrapper
1✔
49
from pants.testutil.rule_runner import QueryRule, RuleRunner, engine_error
1✔
50
from pants.util.dirutil import safe_mkdtemp
1✔
51
from pants.util.logging import LogLevel
1✔
52
from pants.util.strutil import softwrap
1✔
53

54

55
class A:
1✔
56
    pass
1✔
57

58

59
class B:
1✔
60
    pass
1✔
61

62

63
class C:
1✔
64
    pass
1✔
65

66

67
class D:
1✔
68
    pass
1✔
69

70

71
def fn_raises(x):
1✔
72
    raise Exception(f"An exception for {type(x).__name__}")
1✔
73

74

75
@rule
1✔
76
async def nested_raise(x: B) -> A:  # type: ignore[return]
1✔
77
    fn_raises(x)
1✔
78

79

80
@dataclass(frozen=True)
1✔
81
class Fib:
1✔
82
    val: int
83

84

85
@rule(desc="Fibonacci", level=LogLevel.INFO)
1✔
86
async def fib(n: int) -> Fib:
1✔
87
    if n < 2:
1✔
88
        return Fib(n)
1✔
89
    x, y = tuple(await concurrently([fib(int(n - 2)), fib(int(n - 1))]))
1✔
90
    return Fib(x.val + y.val)
1✔
91

92

93
@dataclass(frozen=True)
1✔
94
class MyInt:
1✔
95
    val: int
96

97

98
@dataclass(frozen=True)
1✔
99
class MyFloat:
1✔
100
    val: float
101

102

103
@rule
1✔
104
async def upcast(n: MyInt) -> MyFloat:
1✔
105
    return MyFloat(float(n.val))
×
106

107

108
# This set of dummy types and the following `@rule`s are intended to test that workunits are
109
# being generated correctly and with the correct parent-child relationships.
110

111

112
class Input:
1✔
113
    pass
1✔
114

115

116
class Alpha:
1✔
117
    pass
1✔
118

119

120
class Beta:
1✔
121
    pass
1✔
122

123

124
class Gamma:
1✔
125
    pass
1✔
126

127

128
class Omega:
1✔
129
    pass
1✔
130

131

132
class Epsilon:
1✔
133
    pass
1✔
134

135

136
@rule(desc="Rule number 4", level=LogLevel.INFO)
1✔
137
async def rule_four(a: Alpha) -> Gamma:
1✔
138
    """This rule should be invoked in the body of `rule_two` and therefore its workunit should be a
139
    child of `rule_two`'s workunit."""
140
    return Gamma()
1✔
141

142

143
@rule(desc="Rule number 3", level=LogLevel.INFO)
1✔
144
async def rule_three(o: Omega) -> Beta:
1✔
145
    """This rule should be invoked in the body of `rule_one` and therefore its workunit should be a
146
    child of `rule_one`'s workunit."""
147
    return Beta()
1✔
148

149

150
@rule(desc="Rule number 2", level=LogLevel.INFO)
1✔
151
async def rule_two(a: Alpha) -> Omega:
1✔
152
    """This rule should be invoked in the body of `rule_one` and therefore its workunit should be a
153
    child of `rule_one`'s workunit."""
154
    await rule_four(a)
1✔
155
    return Omega()
1✔
156

157

158
@rule(canonical_name="canonical_rule_one", desc="Rule number 1", level=LogLevel.INFO)
1✔
159
async def rule_one_function(i: Input) -> Beta:
1✔
160
    """This rule should be the first one executed by the engine, and thus have no parent."""
161
    a = Alpha()
1✔
162
    o = await rule_two(a)
1✔
163
    b = await rule_three(o)
1✔
164
    time.sleep(1)
1✔
165
    return b
1✔
166

167

168
@rule(desc="Rule C", level=LogLevel.INFO)
1✔
169
async def rule_C(e: Epsilon) -> Alpha:
1✔
170
    return Alpha()
1✔
171

172

173
@rule
1✔
174
async def rule_B(o: Omega) -> Alpha:
1✔
175
    e = Epsilon()
1✔
176
    a = await rule_C(e)
1✔
177
    return a
1✔
178

179

180
@rule(desc="Rule A", level=LogLevel.INFO)
1✔
181
async def rule_A(i: Input) -> Alpha:
1✔
182
    o = Omega()
1✔
183
    a = await rule_B(o)
1✔
184
    return a
1✔
185

186

187
def mk_scheduler(
1✔
188
    tmp_path: Path,
189
    rules,
190
    include_trace_on_error: bool = True,
191
    max_workunit_verbosity: LogLevel = LogLevel.DEBUG,
192
) -> SchedulerSession:
193
    """Creates a SchedulerSession for a Scheduler with the given Rules installed."""
194
    _executor = PyExecutor(core_threads=2, max_threads=4)
1✔
195

196
    build_root = tmp_path / "build_root"
1✔
197
    build_root.mkdir(parents=True, exist_ok=True)
1✔
198

199
    local_execution_root_dir = os.path.realpath(safe_mkdtemp())
1✔
200
    named_caches_dir = os.path.realpath(safe_mkdtemp())
1✔
201
    scheduler = Scheduler(
1✔
202
        ignore_patterns=[],
203
        use_gitignore=False,
204
        build_root=build_root.as_posix(),
205
        pants_workdir=str(build_root / ".pants.d" / "workdir"),
206
        local_execution_root_dir=local_execution_root_dir,
207
        named_caches_dir=named_caches_dir,
208
        ca_certs_path=None,
209
        rules=rules,
210
        union_membership=UnionMembership.empty(),
211
        executor=_executor,
212
        execution_options=DEFAULT_EXECUTION_OPTIONS,
213
        local_store_options=DEFAULT_LOCAL_STORE_OPTIONS,
214
        include_trace_on_error=include_trace_on_error,
215
    )
216
    return scheduler.new_session(
1✔
217
        build_id="buildid_for_test",
218
        max_workunit_level=max_workunit_verbosity,
219
    )
220

221

222
def test_recursive_multi_get(tmp_path: Path) -> None:
1✔
223
    # Tests that a rule that "uses itself" multiple times per invoke works.
224
    rules = [fib, QueryRule(Fib, (int,))]
1✔
225
    (fib_10,) = mk_scheduler(tmp_path, rules=rules).product_request(Fib, subject=10)
1✔
226
    assert 55 == fib_10.val
1✔
227

228

229
def test_no_include_trace_error_raises_boring_error(tmp_path: Path) -> None:
1✔
230
    rules = [nested_raise, QueryRule(A, (B,))]
1✔
231
    scheduler = mk_scheduler(tmp_path, rules, include_trace_on_error=False)
1✔
232
    with pytest.raises(ExecutionError) as cm:
1✔
233
        list(scheduler.product_request(A, subject=B()))
1✔
234
    assert_equal_with_printing(
1✔
235
        "1 Exception encountered:\n\nException: An exception for B\n", str(cm.value)
236
    )
237

238

239
def test_include_trace_error_raises_error_with_trace(tmp_path: Path) -> None:
1✔
240
    rules = [nested_raise, QueryRule(A, (B,))]
1✔
241
    scheduler = mk_scheduler(tmp_path, rules, include_trace_on_error=True)
1✔
242
    with pytest.raises(ExecutionError) as cm:
1✔
243
        list(scheduler.product_request(A, subject=(B())))
1✔
244
    assert_equal_with_printing(
1✔
245
        dedent(
246
            """
247
            1 Exception encountered:
248

249
            Engine traceback:
250
              in root
251
                ..
252
              in pants.engine.internals.engine_test.nested_raise
253
                ..
254

255
            Traceback (most recent call last):
256
              File LOCATION-INFO, in nested_raise
257
                fn_raises(x)
258
                ~~~~~~~~~^^^
259
              File LOCATION-INFO, in fn_raises
260
                raise Exception(f"An exception for {type(x).__name__}")
261
            Exception: An exception for B
262

263
            """
264
        ).lstrip(),
265
        remove_locations_from_traceback(str(cm.value)),
266
    )
267

268

269
def test_nonexistent_root(tmp_path: Path) -> None:
1✔
270
    rules = [QueryRule(A, [B])]
1✔
271
    # No rules are available to compute A.
272
    with pytest.raises(ValueError) as cm:
1✔
273
        mk_scheduler(tmp_path, rules, include_trace_on_error=False)
1✔
274
    assert (
1✔
275
        "No installed rules return the type A, and it was not provided by potential callers of "
276
    ) in str(cm.value)
277

278

279
def test_missing_query_rule(tmp_path: Path) -> None:
1✔
280
    # Even if we register the rule to go from MyInt -> MyFloat, we must register a QueryRule
281
    # for the graph to work when making a synchronous call via `Scheduler.product_request`.
282
    scheduler = mk_scheduler(tmp_path, rules=[upcast], include_trace_on_error=False)
1✔
283
    with pytest.raises(Exception) as cm:
1✔
284
        scheduler.product_request(MyFloat, subject=MyInt(0))
1✔
285
    assert (
1✔
286
        "No installed QueryRules return the type MyFloat. Try registering QueryRule(MyFloat "
287
        "for MyInt)."
288
    ) in str(cm.value)
289

290

291
def new_run_tracker() -> RunTracker:
1✔
292
    # NB: A RunTracker usually observes "all options" (`full_options_for_scopes`), but it only
293
    # actually directly consumes bootstrap options.
294
    ob = create_options_bootstrapper([])
1✔
295
    return RunTracker(ob.args, ob.bootstrap_options)
1✔
296

297

298
@pytest.fixture
1✔
299
def run_tracker() -> RunTracker:
1✔
300
    return new_run_tracker()
1✔
301

302

303
def _fixture_for_rules(
1✔
304
    tmp_path: Path, rules, max_workunit_verbosity: LogLevel = LogLevel.INFO
305
) -> tuple[SchedulerSession, WorkunitTracker, StreamingWorkunitHandler]:
306
    scheduler = mk_scheduler(
1✔
307
        tmp_path,
308
        rules,
309
        include_trace_on_error=False,
310
        max_workunit_verbosity=max_workunit_verbosity,
311
    )
312
    tracker = WorkunitTracker()
1✔
313
    handler = StreamingWorkunitHandler(
1✔
314
        scheduler,
315
        run_tracker=new_run_tracker(),
316
        callbacks=[tracker],
317
        report_interval_seconds=0.01,
318
        max_workunit_verbosity=max_workunit_verbosity,
319
        specs=Specs.empty(),
320
        options_bootstrapper=create_options_bootstrapper([]),
321
        allow_async_completion=False,
322
    )
323
    return scheduler, tracker, handler
1✔
324

325

326
def test_streaming_workunits_reporting(tmp_path: Path) -> None:
1✔
327
    scheduler, tracker, handler = _fixture_for_rules(
1✔
328
        tmp_path / "start", [fib, QueryRule(Fib, (int,))]
329
    )
330
    with handler:
1✔
331
        scheduler.product_request(Fib, subject=0)
1✔
332
    flattened = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
333
    # The execution of the single named @rule "fib" should be providing this one workunit.
334
    assert len(flattened) == 1
1✔
335

336
    scheduler, tracker, handler = _fixture_for_rules(
1✔
337
        tmp_path / "second", [fib, QueryRule(Fib, (int,))]
338
    )
339
    with handler:
1✔
340
        scheduler.product_request(Fib, subject=10)
1✔
341

342
    # Requesting a bigger fibonacci number will result in more rule executions and thus
343
    # more reported workunits. In this case, we expect 11 invocations of the `fib` rule.
344
    flattened = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
345
    assert len(flattened) == 11
1✔
346
    assert tracker.finished
1✔
347

348

349
def test_streaming_workunits_parent_id_and_rule_metadata(tmp_path: Path) -> None:
1✔
350
    scheduler, tracker, handler = _fixture_for_rules(
1✔
351
        tmp_path,
352
        [rule_one_function, rule_two, rule_three, rule_four, QueryRule(Beta, (Input,))],
353
    )
354
    with handler:
1✔
355
        i = Input()
1✔
356
        scheduler.product_request(Beta, subject=i)
1✔
357
    assert tracker.finished
1✔
358

359
    # rule_one should complete well-after the other rules because of the artificial delay in
360
    # it caused by the sleep().
361
    assert {item["name"] for item in tracker.finished_workunit_chunks[0]} == {
1✔
362
        "pants.engine.internals.engine_test.rule_two",
363
        "pants.engine.internals.engine_test.rule_three",
364
        "pants.engine.internals.engine_test.rule_four",
365
    }
366

367
    # Because of the artificial delay in rule_one, it should have time to be reported as
368
    # started but not yet finished.
369
    started = list(itertools.chain.from_iterable(tracker.started_workunit_chunks))
1✔
370
    assert len([item for item in started if item["name"] == "canonical_rule_one"]) > 0
1✔
371

372
    assert {item["name"] for item in tracker.finished_workunit_chunks[1]} == {"canonical_rule_one"}
1✔
373

374
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
375

376
    r1 = next(item for item in finished if item["name"] == "canonical_rule_one")
1✔
377
    r2 = next(
1✔
378
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_two"
379
    )
380
    r3 = next(
1✔
381
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_three"
382
    )
383
    r4 = next(
1✔
384
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_four"
385
    )
386

387
    # rule_one should have no parent_id because its actual parent workunit was filtered based
388
    # on level.
389
    assert r1.get("parent_id", None) is None
1✔
390

391
    assert r2["parent_id"] == r1["span_id"]
1✔
392
    assert r3["parent_id"] == r1["span_id"]
1✔
393
    assert r4["parent_id"] == r2["span_id"]
1✔
394

395
    assert r3["description"] == "Rule number 3"
1✔
396
    assert r4["description"] == "Rule number 4"
1✔
397
    assert r4["level"] == "INFO"
1✔
398

399

400
def test_streaming_workunit_log_levels(tmp_path: Path) -> None:
1✔
401
    scheduler, tracker, handler = _fixture_for_rules(
1✔
402
        tmp_path,
403
        [rule_one_function, rule_two, rule_three, rule_four, QueryRule(Beta, (Input,))],
404
        max_workunit_verbosity=LogLevel.TRACE,
405
    )
406
    with handler:
1✔
407
        i = Input()
1✔
408
        scheduler.product_request(Beta, subject=i)
1✔
409

410
    assert tracker.finished
1✔
411
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
412

413
    # With the max_workunit_verbosity set to TRACE, we should see the workunit corresponding
414
    # to the Select node.
415
    root = next(
1✔
416
        item
417
        for item in finished
418
        if item["name"]
419
        not in {
420
            "canonical_rule_one",
421
            "pants.engine.internals.engine_test.rule_two",
422
            "pants.engine.internals.engine_test.rule_three",
423
            "pants.engine.internals.engine_test.rule_four",
424
        }
425
    )
426
    assert root["name"] == "root"
1✔
427
    assert root["level"] == "TRACE"
1✔
428

429
    r1 = next(item for item in finished if item["name"] == "canonical_rule_one")
1✔
430
    assert r1["parent_id"] == root["span_id"]
1✔
431

432

433
def test_streaming_workunit_log_level_parent_rewrite(tmp_path: Path) -> None:
1✔
434
    rules = [rule_A, rule_B, rule_C, QueryRule(Alpha, (Input,))]
1✔
435

436
    scheduler, tracker, info_level_handler = _fixture_for_rules(tmp_path, rules)
1✔
437
    with info_level_handler:
1✔
438
        i = Input()
1✔
439
        scheduler.product_request(Alpha, subject=i)
1✔
440

441
    assert tracker.finished
1✔
442
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
443

444
    assert len(finished) == 2
1✔
445
    r_A = next(
1✔
446
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_A"
447
    )
448
    r_C = next(
1✔
449
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_C"
450
    )
451
    assert "parent_id" not in r_A
1✔
452
    assert r_C["parent_id"] == r_A["span_id"]
1✔
453

454
    scheduler, tracker, debug_level_handler = _fixture_for_rules(
1✔
455
        tmp_path, rules, max_workunit_verbosity=LogLevel.TRACE
456
    )
457
    with debug_level_handler:
1✔
458
        i = Input()
1✔
459
        scheduler.product_request(Alpha, subject=i)
1✔
460

461
    assert tracker.finished
1✔
462
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
463

464
    r_A = next(
1✔
465
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_A"
466
    )
467
    r_B = next(
1✔
468
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_B"
469
    )
470
    r_C = next(
1✔
471
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.rule_C"
472
    )
473
    assert r_B["parent_id"] == r_A["span_id"]
1✔
474
    assert r_C["parent_id"] == r_B["span_id"]
1✔
475

476

477
def test_engine_aware_rule(tmp_path: Path) -> None:
1✔
478
    @dataclass(frozen=True)
1✔
479
    class ModifiedOutput(EngineAwareReturnType):
1✔
480
        _level: LogLevel
481
        val: int
482

483
        def level(self):
1✔
484
            return self._level
1✔
485

486
    @rule(desc="a_rule")
1✔
487
    async def a_rule(n: int) -> ModifiedOutput:
1✔
488
        return ModifiedOutput(val=n, _level=LogLevel.ERROR)
1✔
489

490
    scheduler, tracker, handler = _fixture_for_rules(
1✔
491
        tmp_path,
492
        [a_rule, QueryRule(ModifiedOutput, (int,))],
493
        max_workunit_verbosity=LogLevel.TRACE,
494
    )
495
    with handler:
1✔
496
        scheduler.product_request(ModifiedOutput, 0)
1✔
497

498
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
499
    workunit = next(
1✔
500
        item
501
        for item in finished
502
        if item["name"] == "pants.engine.internals.engine_test.test_engine_aware_rule.a_rule"
503
    )
504
    assert workunit["level"] == "ERROR"
1✔
505

506

507
def test_engine_aware_param(tmp_path: Path) -> None:
1✔
508
    @dataclass(frozen=True)
1✔
509
    class ModifiedMetadata(EngineAwareParameter):
1✔
510
        def metadata(self):
1✔
511
            return {"example": "thing"}
1✔
512

513
    @rule
1✔
514
    async def a_rule(_: ModifiedMetadata) -> int:
1✔
515
        return 1
1✔
516

517
    scheduler, tracker, handler = _fixture_for_rules(
1✔
518
        tmp_path,
519
        [a_rule, QueryRule(int, (ModifiedMetadata,))],
520
        max_workunit_verbosity=LogLevel.TRACE,
521
    )
522
    with handler:
1✔
523
        scheduler.product_request(int, subject=ModifiedMetadata())
1✔
524

525
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
526
    workunit = next(
1✔
527
        item
528
        for item in finished
529
        if item["name"] == "pants.engine.internals.engine_test.test_engine_aware_param.a_rule"
530
    )
531
    assert workunit["metadata"] == {"example": "thing"}
1✔
532

533

534
def test_engine_aware_none_case(tmp_path: Path) -> None:
1✔
535
    @dataclass(frozen=True)
1✔
536
    # If level() returns None, the engine shouldn't try to set
537
    # a new workunit level.
538
    class ModifiedOutput(EngineAwareReturnType):
1✔
539
        _level: LogLevel | None
540
        val: int
541

542
        def level(self):
1✔
543
            return self._level
1✔
544

545
    @rule(desc="a_rule")
1✔
546
    async def a_rule(n: int) -> ModifiedOutput:
1✔
547
        return ModifiedOutput(val=n, _level=None)
1✔
548

549
    scheduler, tracker, handler = _fixture_for_rules(
1✔
550
        tmp_path,
551
        [a_rule, QueryRule(ModifiedOutput, (int,))],
552
        max_workunit_verbosity=LogLevel.TRACE,
553
    )
554
    with handler:
1✔
555
        scheduler.product_request(ModifiedOutput, subject=0)
1✔
556

557
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
558
    workunit = next(
1✔
559
        item
560
        for item in finished
561
        if item["name"] == "pants.engine.internals.engine_test.test_engine_aware_none_case.a_rule"
562
    )
563
    assert workunit["level"] == "TRACE"
1✔
564

565

566
def test_artifacts_on_engine_aware_type(tmp_path: Path) -> None:
1✔
567
    @dataclass(frozen=True)
1✔
568
    class Output(EngineAwareReturnType):
1✔
569
        val: int
570

571
        def artifacts(self):
1✔
572
            return {"some_arbitrary_key": EMPTY_SNAPSHOT}
1✔
573

574
    @rule(desc="a_rule")
1✔
575
    async def a_rule(n: int) -> Output:
1✔
576
        return Output(val=n)
1✔
577

578
    scheduler, tracker, handler = _fixture_for_rules(
1✔
579
        tmp_path, [a_rule, QueryRule(Output, (int,))], max_workunit_verbosity=LogLevel.TRACE
580
    )
581
    with handler:
1✔
582
        scheduler.product_request(Output, subject=0)
1✔
583

584
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
585
    workunit = next(
1✔
586
        item
587
        for item in finished
588
        if item["name"]
589
        == "pants.engine.internals.engine_test.test_artifacts_on_engine_aware_type.a_rule"
590
    )
591
    artifacts = workunit["artifacts"]
1✔
592
    assert artifacts["some_arbitrary_key"] == EMPTY_SNAPSHOT
1✔
593

594

595
def test_metadata_on_engine_aware_type(tmp_path: Path) -> None:
1✔
596
    @dataclass(frozen=True)
1✔
597
    class Output(EngineAwareReturnType):
1✔
598
        val: int
599

600
        def metadata(self):
1✔
601
            return {"k1": 1, "k2": "a string", "k3": [1, 2, 3]}
1✔
602

603
    @rule(desc="a_rule")
1✔
604
    async def a_rule(n: int) -> Output:
1✔
605
        return Output(val=n)
1✔
606

607
    scheduler, tracker, handler = _fixture_for_rules(
1✔
608
        tmp_path, [a_rule, QueryRule(Output, (int,))], max_workunit_verbosity=LogLevel.TRACE
609
    )
610
    with handler:
1✔
611
        scheduler.product_request(Output, subject=0)
1✔
612

613
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
614
    workunit = next(
1✔
615
        item
616
        for item in finished
617
        if item["name"]
618
        == "pants.engine.internals.engine_test.test_metadata_on_engine_aware_type.a_rule"
619
    )
620

621
    metadata = workunit["metadata"]
1✔
622
    assert metadata == {"k1": 1, "k2": "a string", "k3": [1, 2, 3]}
1✔
623

624

625
def test_metadata_non_string_key_behavior(tmp_path: Path) -> None:
1✔
626
    # If someone passes a non-string key in a metadata() method,
627
    # this should fail to produce a meaningful metadata entry on
628
    # the workunit (with a warning), but not fail.
629

630
    @dataclass(frozen=True)
1✔
631
    class Output(EngineAwareReturnType):
1✔
632
        val: int
633

634
        def metadata(self):
1✔
635
            return {10: "foo", "other_key": "other value"}
1✔
636

637
    @rule(desc="a_rule")
1✔
638
    async def a_rule(n: int) -> Output:
1✔
639
        return Output(val=n)
1✔
640

641
    scheduler, tracker, handler = _fixture_for_rules(
1✔
642
        tmp_path, [a_rule, QueryRule(Output, (int,))], max_workunit_verbosity=LogLevel.TRACE
643
    )
644
    with handler:
1✔
645
        scheduler.product_request(Output, subject=0)
1✔
646

647
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
648
    workunit = next(
1✔
649
        item
650
        for item in finished
651
        if item["name"]
652
        == "pants.engine.internals.engine_test.test_metadata_non_string_key_behavior.a_rule"
653
    )
654

655
    assert workunit["metadata"] == {}
1✔
656

657

658
@dataclass(frozen=True)
1✔
659
class ComplicatedInput:
1✔
660
    snapshot_1: Snapshot
661
    snapshot_2: Snapshot
662

663

664
@dataclass(frozen=True)
1✔
665
class Output(EngineAwareReturnType):
1✔
666
    snapshot_1: Snapshot
667
    snapshot_2: Snapshot
668

669
    def artifacts(self):
1✔
670
        return {"snapshot_1": self.snapshot_1, "snapshot_2": self.snapshot_2}
1✔
671

672

673
@rule(desc="a_rule", level=LogLevel.DEBUG)
1✔
674
async def a_rule(input: ComplicatedInput) -> Output:
1✔
675
    return Output(snapshot_1=input.snapshot_1, snapshot_2=input.snapshot_2)
1✔
676

677

678
@pytest.fixture
1✔
679
def rule_runner() -> RuleRunner:
1✔
680
    return RuleRunner(
1✔
681
        rules=[
682
            a_rule,
683
            QueryRule(Output, (ComplicatedInput,)),
684
            QueryRule(ProcessResult, (Process,)),
685
        ],
686
        isolated_local_store=True,
687
        # NB: The Sessions's configured verbosity is applied before a `StreamingWorkunitHandler`
688
        # can poll, and prevents things from being stored at all. So in order to observe TRACE
689
        # workunits in a poll, we must also configure TRACE on the Session.
690
        max_workunit_verbosity=LogLevel.TRACE,
691
    )
692

693

694
def test_counters(rule_runner: RuleRunner, run_tracker: RunTracker) -> None:
1✔
695
    scheduler = rule_runner.scheduler
1✔
696

697
    tracker = WorkunitTracker()
1✔
698
    handler = StreamingWorkunitHandler(
1✔
699
        scheduler,
700
        run_tracker=run_tracker,
701
        callbacks=[tracker],
702
        report_interval_seconds=0.01,
703
        max_workunit_verbosity=LogLevel.TRACE,
704
        specs=Specs.empty(),
705
        options_bootstrapper=create_options_bootstrapper([]),
706
        allow_async_completion=False,
707
    )
708

709
    with handler:
1✔
710
        scheduler.record_test_observation(128)
1✔
711
        rule_runner.request(
1✔
712
            ProcessResult,
713
            [
714
                Process(
715
                    ["/bin/sh", "-c", "true"],
716
                    description="always true",
717
                    cache_scope=ProcessCacheScope.PER_SESSION,
718
                )
719
            ],
720
        )
721
        metrics_info = scheduler.get_metrics()
1✔
722
        histograms_info = scheduler.get_observation_histograms()
1✔
723

724
    assert metrics_info["local_cache_requests"] == 1
1✔
725
    assert metrics_info["local_cache_requests_uncached"] == 1
1✔
726
    assert metrics_info["local_execution_requests"] == 1
1✔
727

728
    assert histograms_info["version"] == 0
1✔
729
    assert "histograms" in histograms_info
1✔
730
    assert "test_observation" in histograms_info["histograms"]
1✔
731
    assert (
1✔
732
        histograms_info["histograms"]["test_observation"]
733
        == b"\x1c\x84\x93\x14\x00\x00\x00\x1fx\x9c\x93i\x99,\xcc\xc0\xc0\xc0\xcc\x00\x010\x9a\x11J3\xd9\x7f\x800\xfe32\x01\x00E\x0c\x03\x81"
734
    )
735

736

737
def test_more_complicated_engine_aware(rule_runner: RuleRunner, run_tracker: RunTracker) -> None:
1✔
738
    tracker = WorkunitTracker()
1✔
739
    handler = StreamingWorkunitHandler(
1✔
740
        rule_runner.scheduler,
741
        run_tracker=run_tracker,
742
        callbacks=[tracker],
743
        report_interval_seconds=0.01,
744
        max_workunit_verbosity=LogLevel.TRACE,
745
        specs=Specs.empty(),
746
        options_bootstrapper=create_options_bootstrapper([]),
747
        allow_async_completion=False,
748
    )
749
    with handler:
1✔
750
        input_1 = CreateDigest(
1✔
751
            (
752
                FileContent(path="a.txt", content=b"alpha"),
753
                FileContent(path="b.txt", content=b"beta"),
754
            )
755
        )
756
        digest_1 = rule_runner.request(Digest, [input_1])
1✔
757
        snapshot_1 = rule_runner.request(Snapshot, [digest_1])
1✔
758

759
        input_2 = CreateDigest((FileContent(path="g.txt", content=b"gamma"),))
1✔
760
        digest_2 = rule_runner.request(Digest, [input_2])
1✔
761
        snapshot_2 = rule_runner.request(Snapshot, [digest_2])
1✔
762

763
        input = ComplicatedInput(snapshot_1=snapshot_1, snapshot_2=snapshot_2)
1✔
764

765
        rule_runner.request(Output, [input])
1✔
766

767
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
768
    workunit = next(
1✔
769
        item for item in finished if item["name"] == "pants.engine.internals.engine_test.a_rule"
770
    )
771

772
    artifacts = workunit["artifacts"]
1✔
773
    output_snapshot_1 = artifacts["snapshot_1"]
1✔
774
    output_snapshot_2 = artifacts["snapshot_2"]
1✔
775

776
    output_contents_list = handler.context.snapshots_to_file_contents(
1✔
777
        [output_snapshot_1, output_snapshot_2]
778
    )
779
    assert len(output_contents_list) == 2
1✔
780

781
    assert isinstance(output_contents_list[0], DigestContents)
1✔
782
    assert isinstance(output_contents_list[1], DigestContents)
1✔
783

784
    digest_contents_1 = output_contents_list[0]
1✔
785
    digest_contents_2 = output_contents_list[1]
1✔
786

787
    assert len(tuple(x for x in digest_contents_1 if x.content == b"alpha")) == 1
1✔
788
    assert len(tuple(x for x in digest_contents_1 if x.content == b"beta")) == 1
1✔
789

790
    assert len(tuple(x for x in digest_contents_2 if x.content == b"gamma")) == 1
1✔
791

792

793
def test_process_digests_on_streaming_workunits(
1✔
794
    rule_runner: RuleRunner, run_tracker: RunTracker
795
) -> None:
796
    scheduler = rule_runner.scheduler
1✔
797

798
    tracker = WorkunitTracker()
1✔
799
    handler = StreamingWorkunitHandler(
1✔
800
        scheduler,
801
        run_tracker=run_tracker,
802
        callbacks=[tracker],
803
        report_interval_seconds=0.01,
804
        max_workunit_verbosity=LogLevel.DEBUG,
805
        specs=Specs.empty(),
806
        options_bootstrapper=create_options_bootstrapper([]),
807
        allow_async_completion=False,
808
    )
809

810
    stdout_process = Process(
1✔
811
        argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
812
    )
813

814
    with handler:
1✔
815
        result = rule_runner.request(ProcessResult, [stdout_process])
1✔
816

817
    assert tracker.finished
1✔
818
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
819

820
    process_workunit = next(item for item in finished if item["name"] == "process")
1✔
821
    assert process_workunit is not None
1✔
822
    stdout_digest = process_workunit["artifacts"]["stdout_digest"]
1✔
823
    stderr_digest = process_workunit["artifacts"]["stderr_digest"]
1✔
824

825
    assert result.stdout == b"stdout output\n"
1✔
826
    assert stderr_digest == EMPTY_FILE_DIGEST
1✔
827
    assert stdout_digest.serialized_bytes_length == len(result.stdout)
1✔
828

829
    tracker = WorkunitTracker()
1✔
830
    handler = StreamingWorkunitHandler(
1✔
831
        scheduler,
832
        run_tracker=run_tracker,
833
        callbacks=[tracker],
834
        report_interval_seconds=0.01,
835
        max_workunit_verbosity=LogLevel.DEBUG,
836
        specs=Specs.empty(),
837
        options_bootstrapper=create_options_bootstrapper([]),
838
        allow_async_completion=False,
839
    )
840
    stderr_process = Process(
1✔
841
        argv=("/bin/bash", "-c", "1>&2 /bin/echo 'stderr output'"), description="Stderr process"
842
    )
843
    with handler:
1✔
844
        result = rule_runner.request(ProcessResult, [stderr_process])
1✔
845

846
    assert tracker.finished
1✔
847
    finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
1✔
848
    process_workunit = next(item for item in finished if item["name"] == "process")
1✔
849

850
    assert process_workunit is not None
1✔
851
    stdout_digest = process_workunit["artifacts"]["stdout_digest"]
1✔
852
    stderr_digest = process_workunit["artifacts"]["stderr_digest"]
1✔
853

854
    assert result.stderr == b"stderr output\n"
1✔
855
    assert stdout_digest == EMPTY_FILE_DIGEST
1✔
856
    assert stderr_digest.serialized_bytes_length == len(result.stderr)
1✔
857

858
    assert process_workunit["metadata"]["exit_code"] == 0
1✔
859

860
    try:
1✔
861
        scheduler.ensure_remote_has_recursive([stdout_digest, stderr_digest])
1✔
862
    except Exception as e:
1✔
863
        # This is the exception message we should expect from invoking ensure_remote_has_recursive()
864
        # in rust.
865
        assert str(e) == "Cannot ensure remote has blobs without a remote"
1✔
866

867
    byte_outputs = scheduler.single_file_digests_to_bytes([stdout_digest, stderr_digest])
1✔
868
    assert byte_outputs[0] == result.stdout
1✔
869
    assert byte_outputs[1] == result.stderr
1✔
870

871

872
def test_context_object_on_streaming_workunits(
1✔
873
    rule_runner: RuleRunner, run_tracker: RunTracker
874
) -> None:
875
    scheduler = rule_runner.scheduler
1✔
876

877
    class Callback(WorkunitsCallback):
1✔
878
        @property
1✔
879
        def can_finish_async(self) -> bool:
1✔
880
            return False
×
881

882
        def __call__(self, **kwargs) -> None:
1✔
883
            context = kwargs["context"]
1✔
884
            assert isinstance(context, StreamingWorkunitContext)
1✔
885

886
            completed_workunits = kwargs["completed_workunits"]
1✔
887
            for workunit in completed_workunits:
1✔
888
                if "artifacts" in workunit and "stdout_digest" in workunit["artifacts"]:
1✔
889
                    digest = workunit["artifacts"]["stdout_digest"]
×
890
                    output = context.single_file_digests_to_bytes([digest])
×
891
                    assert output == [b"stdout output\n"]
×
892

893
    handler = StreamingWorkunitHandler(
1✔
894
        scheduler,
895
        run_tracker=run_tracker,
896
        callbacks=[Callback()],
897
        report_interval_seconds=0.01,
898
        max_workunit_verbosity=LogLevel.INFO,
899
        specs=Specs.empty(),
900
        options_bootstrapper=create_options_bootstrapper([]),
901
        allow_async_completion=False,
902
    )
903
    stdout_process = Process(
1✔
904
        argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
905
    )
906
    with handler:
1✔
907
        rule_runner.request(ProcessResult, [stdout_process])
1✔
908

909

910
def test_streaming_workunits_expanded_specs(run_tracker: RunTracker) -> None:
1✔
911
    rule_runner = RuleRunner(
1✔
912
        target_types=[PythonSourcesGeneratorTarget],
913
        rules=[
914
            QueryRule(ProcessResult, (Process,)),
915
        ],
916
    )
917
    rule_runner.set_options(["--backend-packages=pants.backend.python"])
1✔
918
    rule_runner.write_files(
1✔
919
        {
920
            "src/python/somefiles/BUILD": "python_sources()",
921
            "src/python/somefiles/a.py": "print('')",
922
            "src/python/somefiles/b.py": "print('')",
923
            "src/python/others/BUILD": "python_sources()",
924
            "src/python/others/a.py": "print('')",
925
            "src/python/others/b.py": "print('')",
926
        }
927
    )
928
    specs = SpecsParser().parse_specs(
1✔
929
        ["src/python/somefiles::", "src/python/others/b.py"], description_of_origin="tests"
930
    )
931

932
    class Callback(WorkunitsCallback):
1✔
933
        @property
1✔
934
        def can_finish_async(self) -> bool:
1✔
935
            return False
×
936

937
        def __call__(self, **kwargs) -> None:
1✔
938
            context = kwargs["context"]
1✔
939
            assert isinstance(context, StreamingWorkunitContext)
1✔
940

941
            expanded = context.get_expanded_specs()
1✔
942
            targets = expanded.targets
×
943

944
            assert len(targets.keys()) == 2
×
945
            assert targets["src/python/others/b.py"] == [
×
946
                TargetInfo(filename="src/python/others/b.py")
947
            ]
948
            assert set(targets["src/python/somefiles"]) == {
×
949
                TargetInfo(filename="src/python/somefiles/a.py"),
950
                TargetInfo(filename="src/python/somefiles/b.py"),
951
            }
952

953
    handler = StreamingWorkunitHandler(
1✔
954
        scheduler=rule_runner.scheduler,
955
        run_tracker=run_tracker,
956
        callbacks=[Callback()],
957
        report_interval_seconds=0.01,
958
        max_workunit_verbosity=LogLevel.INFO,
959
        specs=specs,
960
        options_bootstrapper=create_options_bootstrapper(
961
            ["--backend-packages=pants.backend.python"]
962
        ),
963
        allow_async_completion=False,
964
    )
965
    stdout_process = Process(
1✔
966
        argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
967
    )
968
    with handler:
1✔
969
        rule_runner.request(ProcessResult, [stdout_process])
1✔
970

971

972
@union
1✔
973
class Union:
1✔
974
    pass
1✔
975

976

977
@dataclass(frozen=True)
1✔
978
class Str:
1✔
979
    val: str
980

981

982
@rule(polymorphic=True)
1✔
983
async def to_str(_: Union) -> Str:
1✔
984
    raise NotImplementedError()
×
985

986

987
class Member(Union):
1✔
988
    pass
1✔
989

990

991
def test_union_member_construction(run_tracker: RunTracker) -> None:
1✔
992
    """Use a union member which is a subclass of its @union as polymorphic input."""
993

994
    @rule
1✔
995
    async def output(_: Member) -> Str:
1✔
996
        return Str("yep")
1✔
997

998
    @rule
1✔
999
    async def for_member() -> str:
1✔
1000
        ret = await to_str(**implicitly({Member(): Union}))
1✔
1001
        return ret.val
1✔
1002

1003
    rule_runner = RuleRunner(
1✔
1004
        target_types=[],
1005
        rules=[
1006
            UnionRule(Union, Member),
1007
            QueryRule(str, ()),
1008
            to_str,
1009
            output,
1010
            for_member,
1011
        ],
1012
    )
1013

1014
    assert "yep" == rule_runner.request(str, [])
1✔
1015

1016

1017
@dataclass(frozen=True)
1✔
1018
class FileInput:
1✔
1019
    filename: str
1020

1021

1022
@dataclass(frozen=True)
1✔
1023
class MergedOutput:
1✔
1024
    digest: Digest
1025

1026

1027
class MergeErr(Exception):
1✔
1028
    pass
1✔
1029

1030

1031
@rule
1✔
1032
async def catch_merge_digests_error(file_input: FileInput) -> MergedOutput:
1✔
1033
    # Create two separate digests writing different contents to the same file path.
1034
    input_1 = CreateDigest((FileContent(path=file_input.filename, content=b"yes"),))
1✔
1035
    input_2 = CreateDigest((FileContent(path=file_input.filename, content=b"no"),))
1✔
1036
    digests = await concurrently(create_digest(input_1), create_digest(input_2))
1✔
1037
    try:
1✔
1038
        merged = await merge_digests(MergeDigests(digests))
1✔
1039
    except IntrinsicError as e:
1✔
1040
        raise MergeErr(f"error merging digests for input {file_input}: {e}")
1✔
1041
    return MergedOutput(merged)
×
1042

1043

1044
def test_catch_intrinsic_error() -> None:
1✔
1045
    rule_runner = RuleRunner(
1✔
1046
        rules=[catch_merge_digests_error, QueryRule(MergedOutput, (FileInput,))]
1047
    )
1048
    msg = softwrap(
1✔
1049
        """\
1050
        error merging digests for input FileInput(filename='some-file.txt'): Can only merge
1051
        Directories with no duplicates, but found 2 duplicate entries in :
1052
        """
1053
    )
1054
    with engine_error(MergeErr, contains=msg):
1✔
1055
        rule_runner.request(MergedOutput, (FileInput("some-file.txt"),))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc