• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 22285099215

22 Feb 2026 08:52PM UTC coverage: 75.854% (-17.1%) from 92.936%
22285099215

Pull #23121

github

web-flow
Merge c7299df9c into ba8359840
Pull Request #23121: fix issue with optional fields in dependency validator

28 of 29 new or added lines in 2 files covered. (96.55%)

11174 existing lines in 400 files now uncovered.

53694 of 70786 relevant lines covered (75.85%)

1.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.15
/src/python/pants/core/util_rules/adhoc_process_support.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
3✔
4

5
import dataclasses
3✔
6
import hashlib
3✔
7
import json
3✔
8
import logging
3✔
9
import os
3✔
10
import shlex
3✔
11
from collections.abc import Iterable, Mapping
3✔
12
from dataclasses import dataclass
3✔
13
from datetime import datetime
3✔
14
from enum import Enum
3✔
15
from textwrap import dedent  # noqa: PNT20
3✔
16
from typing import TypeVar
3✔
17

18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
3✔
19
from pants.build_graph.address import Address
3✔
20
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
3✔
21
from pants.core.goals.package import (
3✔
22
    EnvironmentAwarePackageRequest,
23
    PackageFieldSet,
24
    environment_aware_package,
25
)
26
from pants.core.goals.run import RunFieldSet, generate_run_in_sandbox_request
3✔
27
from pants.core.target_types import FileSourceField
3✔
28
from pants.core.util_rules.env_vars import environment_vars_subset
3✔
29
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
3✔
30
from pants.core.util_rules.system_binaries import BashBinary
3✔
31
from pants.engine import process
3✔
32
from pants.engine.addresses import Addresses, UnparsedAddressInputs
3✔
33
from pants.engine.env_vars import EnvironmentVarsRequest
3✔
34
from pants.engine.environment import EnvironmentName
3✔
35
from pants.engine.fs import (
3✔
36
    EMPTY_DIGEST,
37
    CreateDigest,
38
    Digest,
39
    DigestSubset,
40
    Directory,
41
    FileContent,
42
    GlobExpansionConjunction,
43
    MergeDigests,
44
    PathGlobs,
45
    PathMetadataRequest,
46
    Snapshot,
47
)
48
from pants.engine.internals.build_files import resolve_address
3✔
49
from pants.engine.internals.graph import (
3✔
50
    find_valid_field_sets,
51
    resolve_target,
52
    resolve_targets,
53
    resolve_unparsed_address_inputs,
54
    transitive_targets,
55
)
56
from pants.engine.internals.native_engine import AddressInput, PathMetadata, RemovePrefix
3✔
57
from pants.engine.intrinsics import (
3✔
58
    create_digest,
59
    digest_subset_to_digest,
60
    digest_to_snapshot,
61
    execute_process,
62
    merge_digests,
63
    path_globs_to_paths,
64
    path_metadata_request,
65
    remove_prefix,
66
)
67
from pants.engine.process import (
3✔
68
    FallibleProcessResult,
69
    Process,
70
    ProcessCacheScope,
71
    ProcessResult,
72
    ProductDescription,
73
    fallible_to_exec_result_or_raise,
74
)
75
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
3✔
76
from pants.engine.target import (
3✔
77
    FieldSetsPerTargetRequest,
78
    InvalidFieldException,
79
    SourcesField,
80
    Target,
81
    TransitiveTargetsRequest,
82
    WrappedTargetRequest,
83
)
84
from pants.util.frozendict import FrozenDict
3✔
85

86
logger = logging.getLogger(__name__)
3✔
87

88

89
@dataclass(frozen=True)
3✔
90
class AdhocProcessRequest:
3✔
91
    description: str
3✔
92
    address: Address
3✔
93
    working_directory: str
3✔
94
    root_output_directory: str
3✔
95
    argv: tuple[str, ...]
3✔
96
    timeout: int | None
3✔
97
    input_digest: Digest
3✔
98
    immutable_input_digests: FrozenDict[str, Digest] | None
3✔
99
    append_only_caches: FrozenDict[str, str] | None
3✔
100
    output_files: tuple[str, ...]
3✔
101
    output_directories: tuple[str, ...]
3✔
102
    env_vars: FrozenDict[str, str]
3✔
103
    log_on_process_errors: FrozenDict[int, str] | None
3✔
104
    log_output: bool
3✔
105
    capture_stdout_file: str | None
3✔
106
    capture_stderr_file: str | None
3✔
107
    workspace_invalidation_globs: PathGlobs | None
3✔
108
    cache_scope: ProcessCacheScope | None = None
3✔
109
    use_working_directory_as_base_for_output_captures: bool = True
3✔
110
    outputs_match_error_behavior: GlobMatchErrorBehavior = GlobMatchErrorBehavior.error
3✔
111
    outputs_match_conjunction: GlobExpansionConjunction | None = GlobExpansionConjunction.all_match
3✔
112

113

114
@dataclass(frozen=True)
3✔
115
class PreparedAdhocProcessRequest:
3✔
116
    original_request: AdhocProcessRequest
3✔
117
    process: Process
3✔
118

119

120
@dataclass(frozen=True)
3✔
121
class FallibleAdhocProcessResult:
3✔
122
    process_result: FallibleProcessResult
3✔
123
    adjusted_digest: Digest
3✔
124
    description: str
3✔
125

126

127
@dataclass(frozen=True)
3✔
128
class AdhocProcessResult:
3✔
129
    process_result: ProcessResult
3✔
130
    adjusted_digest: Digest
3✔
131

132

133
@dataclass(frozen=True)
3✔
134
class ResolveExecutionDependenciesRequest:
3✔
135
    address: Address
3✔
136
    execution_dependencies: tuple[str, ...] | None
3✔
137
    runnable_dependencies: tuple[str, ...] | None
3✔
138

139

140
@dataclass(frozen=True)
3✔
141
class ResolvedExecutionDependencies:
3✔
142
    digest: Digest
3✔
143
    runnable_dependencies: RunnableDependencies | None
3✔
144

145

146
@dataclass(frozen=True)
3✔
147
class RunnableDependencies:
3✔
148
    path_component: str
3✔
149
    immutable_input_digests: Mapping[str, Digest]
3✔
150
    append_only_caches: Mapping[str, str]
3✔
151
    extra_env: Mapping[str, str]
3✔
152

153

154
@dataclass(frozen=True)
3✔
155
class ToolRunnerRequest:
3✔
156
    runnable_address_str: str
3✔
157
    args: tuple[str, ...]
3✔
158
    execution_dependencies: tuple[str, ...]
3✔
159
    runnable_dependencies: tuple[str, ...]
3✔
160
    target: Target
3✔
161
    runnable_address_field_alias: str
3✔
162
    named_caches: FrozenDict[str, str] | None = None
3✔
163

164

165
@dataclass(frozen=True)
3✔
166
class ToolRunner:
3✔
167
    digest: Digest
3✔
168
    args: tuple[str, ...]
3✔
169
    extra_env: FrozenDict[str, str]
3✔
170
    extra_paths: tuple[str, ...]
3✔
171
    append_only_caches: FrozenDict[str, str]
3✔
172
    immutable_input_digests: FrozenDict[str, Digest]
3✔
173

174

175
#
176
# Things that need a home
177
#
178

179

180
@dataclass(frozen=True)
3✔
181
class ExtraSandboxContents:
3✔
182
    digest: Digest
3✔
183
    paths: tuple[str, ...]
3✔
184
    immutable_input_digests: Mapping[str, Digest]
3✔
185
    append_only_caches: Mapping[str, str]
3✔
186
    extra_env: Mapping[str, str]
3✔
187

188

189
@dataclass(frozen=True)
3✔
190
class MergeExtraSandboxContents:
3✔
191
    additions: tuple[ExtraSandboxContents, ...]
3✔
192

193

194
@rule
3✔
195
async def merge_extra_sandbox_contents(request: MergeExtraSandboxContents) -> ExtraSandboxContents:
3✔
196
    additions = request.additions
2✔
197

198
    digests: list[Digest] = []
2✔
199
    paths: list[str] = []
2✔
200
    immutable_input_digests: dict[str, Digest] = {}
2✔
201
    append_only_caches: dict[str, str] = {}
2✔
202
    extra_env: dict[str, str] = {}
2✔
203

204
    for addition in additions:
2✔
205
        digests.append(addition.digest)
2✔
206
        if addition.paths:
2✔
207
            paths.extend(addition.paths)
2✔
208
        _safe_update(immutable_input_digests, addition.immutable_input_digests)
2✔
209
        _safe_update(append_only_caches, addition.append_only_caches)
2✔
210
        _safe_update(extra_env, addition.extra_env)
2✔
211

212
    digest = await merge_digests(MergeDigests(digests))
2✔
213

214
    return ExtraSandboxContents(
2✔
215
        digest=digest,
216
        paths=tuple(paths),
217
        immutable_input_digests=FrozenDict(immutable_input_digests),
218
        append_only_caches=FrozenDict(append_only_caches),
219
        extra_env=FrozenDict(extra_env),
220
    )
221

222

223
#
224
# END THINGS THAT NEED A HOME
225
#
226

227

228
@rule
3✔
229
async def convert_fallible_adhoc_process_result(
3✔
230
    fallible_result: FallibleAdhocProcessResult,
231
) -> AdhocProcessResult:
232
    result = await fallible_to_exec_result_or_raise(
2✔
233
        **implicitly(
234
            {
235
                fallible_result.process_result: FallibleProcessResult,
236
                ProductDescription(fallible_result.description): ProductDescription,
237
            }
238
        )
239
    )
240
    return AdhocProcessResult(
2✔
241
        process_result=result, adjusted_digest=fallible_result.adjusted_digest
242
    )
243

244

245
async def _resolve_runnable_dependencies(
3✔
246
    bash: BashBinary, deps: tuple[str, ...] | None, owning: Address, origin: str
247
) -> tuple[Digest, RunnableDependencies | None]:
248
    if not deps:
2✔
249
        return EMPTY_DIGEST, None
2✔
250

251
    addresses = await resolve_unparsed_address_inputs(
2✔
252
        UnparsedAddressInputs(
253
            (dep for dep in deps),
254
            owning_address=owning,
255
            description_of_origin=origin,
256
        ),
257
        **implicitly(),
258
    )
259

260
    targets = await resolve_targets(**implicitly({addresses: Addresses}))
2✔
261
    fspt = await find_valid_field_sets(
2✔
262
        FieldSetsPerTargetRequest(RunFieldSet, targets), **implicitly()
263
    )
264

265
    for address, field_set in zip(addresses, fspt.collection):
2✔
266
        if not field_set:
2✔
267
            raise ValueError(
×
268
                dedent(
269
                    f"""\
270
                    Address `{address.spec}` was specified as a runnable dependency, but is not
271
                    runnable.
272
                    """
273
                )
274
            )
275

276
    runnables = await concurrently(
2✔
277
        generate_run_in_sandbox_request(**implicitly({field_set[0]: RunFieldSet}))
278
        for field_set in fspt.collection
279
    )
280

281
    shims: list[FileContent] = []
2✔
282
    extras: list[ExtraSandboxContents] = []
2✔
283

284
    for address, runnable in zip(addresses, runnables):
2✔
285
        extras.append(
2✔
286
            ExtraSandboxContents(
287
                digest=runnable.digest,
288
                paths=(),
289
                immutable_input_digests=FrozenDict(runnable.immutable_input_digests or {}),
290
                append_only_caches=FrozenDict(runnable.append_only_caches or {}),
291
                extra_env=FrozenDict(),
292
            )
293
        )
294
        shims.append(
2✔
295
            FileContent(
296
                address.target_name,
297
                _runnable_dependency_shim(bash.path, runnable.args, runnable.extra_env),
298
                is_executable=True,
299
            )
300
        )
301

302
    merged_extras, shim_digest = await concurrently(
2✔
303
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extras))),
304
        create_digest(CreateDigest(shims)),
305
    )
306

307
    shim_digest_path = f"_runnable_dependency_shims_{shim_digest.fingerprint}"
2✔
308
    immutable_input_digests = {shim_digest_path: shim_digest}
2✔
309
    _safe_update(immutable_input_digests, merged_extras.immutable_input_digests)
2✔
310

311
    return (
2✔
312
        merged_extras.digest,
313
        RunnableDependencies(
314
            shim_digest_path,
315
            FrozenDict(immutable_input_digests),
316
            merged_extras.append_only_caches,
317
            FrozenDict({"_PANTS_SHIM_ROOT": "{chroot}"}),
318
        ),
319
    )
320

321

322
@rule
3✔
323
async def resolve_execution_environment(
3✔
324
    request: ResolveExecutionDependenciesRequest,
325
    bash: BashBinary,
326
) -> ResolvedExecutionDependencies:
327
    target_address = request.address
2✔
328
    raw_execution_dependencies = request.execution_dependencies
2✔
329

330
    # Always include the execution dependencies that were specified
331
    if raw_execution_dependencies is not None:
2✔
332
        _descr = f"the `execution_dependencies` from the target {target_address}"
2✔
333
        execution_dependencies = await resolve_unparsed_address_inputs(
2✔
334
            UnparsedAddressInputs(
335
                raw_execution_dependencies,
336
                owning_address=target_address,
337
                description_of_origin=_descr,
338
            ),
339
            **implicitly(),
340
        )
341
    else:
342
        execution_dependencies = Addresses(())
2✔
343

344
    transitive = await transitive_targets(
2✔
345
        TransitiveTargetsRequest(execution_dependencies), **implicitly()
346
    )
347

348
    all_dependencies = (
2✔
349
        *(i for i in transitive.roots if i.address is not target_address),
350
        *transitive.dependencies,
351
    )
352

353
    sources, pkgs_per_target = await concurrently(
2✔
354
        determine_source_files(
355
            SourceFilesRequest(
356
                sources_fields=[tgt.get(SourcesField) for tgt in all_dependencies],
357
                for_sources_types=(SourcesField, FileSourceField),
358
                enable_codegen=True,
359
            )
360
        ),
361
        find_valid_field_sets(
362
            FieldSetsPerTargetRequest(PackageFieldSet, all_dependencies), **implicitly()
363
        ),
364
    )
365

366
    packages = await concurrently(
2✔
367
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
368
        for field_set in pkgs_per_target.field_sets
369
    )
370

371
    _descr = f"the `runnable_dependencies` from the target {target_address}"
2✔
372
    runnables_digest, runnable_dependencies = await _resolve_runnable_dependencies(
2✔
373
        bash, request.runnable_dependencies, target_address, _descr
374
    )
375

376
    dependencies_digest = await merge_digests(
2✔
377
        MergeDigests([sources.snapshot.digest, runnables_digest, *(pkg.digest for pkg in packages)])
378
    )
379

380
    return ResolvedExecutionDependencies(dependencies_digest, runnable_dependencies)
2✔
381

382

383
K = TypeVar("K")
3✔
384
V = TypeVar("V")
3✔
385

386

387
def _safe_update(d1: dict[K, V], d2: Mapping[K, V]) -> dict[K, V]:
3✔
388
    """Updates `d1` with the values from `d2`, raising an exception if a key exists in both
389
    dictionaries, but with a different value."""
390

391
    for k, v in d2.items():
2✔
392
        if k in d1 and d1[k] != v:
2✔
393
            raise ValueError(f"Key {k} was specified in both dictionaries with different values.")
×
394
        d1[k] = v
2✔
395
    return d1
2✔
396

397

398
def _runnable_dependency_shim(
3✔
399
    bash: str, args: Iterable[str], extra_env: Mapping[str, str]
400
) -> bytes:
401
    """The binary shim script to be placed in the output directory for the digest."""
402

403
    def _quote(s: str) -> str:
2✔
404
        quoted = shlex.quote(s)
2✔
405
        return quoted.replace("{chroot}", "'${_PANTS_SHIM_ROOT}'")
2✔
406

407
    binary = " ".join(_quote(arg) for arg in args)
2✔
408
    env_str = "\n".join(
2✔
409
        f"export {shlex.quote(key)}={_quote(value)}" for (key, value) in extra_env.items()
410
    )
411
    return dedent(
2✔
412
        f"""\
413
        #!{bash}
414
        {env_str}
415
        exec {binary} "$@"
416
        """
417
    ).encode()
418

419

420
@rule
3✔
421
async def create_tool_runner(
3✔
422
    request: ToolRunnerRequest,
423
) -> ToolRunner:
424
    runnable_address = await resolve_address(
1✔
425
        **implicitly(
426
            {
427
                AddressInput.parse(
428
                    request.runnable_address_str,
429
                    relative_to=request.target.address.spec_path,
430
                    description_of_origin=f"Runnable target for {request.target.address.spec_path}",
431
                ): AddressInput
432
            }
433
        )
434
    )
435

436
    addresses = Addresses((runnable_address,))
1✔
437
    addresses.expect_single()
1✔
438

439
    runnable_targets = await resolve_targets(**implicitly({addresses: Addresses}))
1✔
440

441
    run_field_sets, environment_name = await concurrently(
1✔
442
        find_valid_field_sets(
443
            FieldSetsPerTargetRequest(RunFieldSet, runnable_targets), **implicitly()
444
        ),
445
        resolve_environment_name(
446
            EnvironmentNameRequest.from_target(request.target), **implicitly()
447
        ),
448
    )
449

450
    if not run_field_sets.field_sets:
1✔
451
        raise InvalidFieldException(
×
452
            f"The `{request.runnable_address_field_alias}` field in target `{request.target.address}` must be set to the "
453
            f"address of a target which can be run by the `run` goal. Instead, the `{request.runnable_address_field_alias}` field is "
454
            f"set to `{request.runnable_address_str}` which refers to the `{runnable_targets[0].alias}` target "
455
            f"at `{runnable_targets[0].address}` that cannot be run by the `run` goal."
456
        )
457

458
    run_field_set: RunFieldSet = run_field_sets.field_sets[0]
1✔
459

460
    req = ResolveExecutionDependenciesRequest(
1✔
461
        address=request.target.address,
462
        execution_dependencies=request.execution_dependencies,
463
        runnable_dependencies=request.runnable_dependencies,
464
    )
465
    execution_environment = await resolve_execution_environment(
1✔
466
        **implicitly({req: ResolveExecutionDependenciesRequest, environment_name: EnvironmentName}),
467
    )
468

469
    # Must be run in target environment so that the binaries/envvars match the execution
470
    # environment when we actually run the process.
471
    run_request = await generate_run_in_sandbox_request(
1✔
472
        **implicitly({run_field_set: RunFieldSet, environment_name: EnvironmentName})
473
    )
474

475
    dependencies_digest = execution_environment.digest
1✔
476
    runnable_dependencies = execution_environment.runnable_dependencies
1✔
477

478
    extra_env: dict[str, str] = dict(run_request.extra_env or {})
1✔
479
    extra_path = extra_env.pop("PATH", None)
1✔
480

481
    extra_sandbox_contents = []
1✔
482

483
    extra_sandbox_contents.append(
1✔
484
        ExtraSandboxContents(
485
            digest=EMPTY_DIGEST,
486
            paths=(extra_path,) if extra_path else (),
487
            immutable_input_digests=run_request.immutable_input_digests or FrozenDict(),
488
            append_only_caches=run_request.append_only_caches or FrozenDict(),
489
            extra_env=run_request.extra_env or FrozenDict(),
490
        )
491
    )
492

493
    if runnable_dependencies:
1✔
494
        extra_sandbox_contents.append(
1✔
495
            ExtraSandboxContents(
496
                digest=EMPTY_DIGEST,
497
                paths=(f"{{chroot}}/{runnable_dependencies.path_component}",),
498
                immutable_input_digests=runnable_dependencies.immutable_input_digests,
499
                append_only_caches=runnable_dependencies.append_only_caches,
500
                extra_env=runnable_dependencies.extra_env,
501
            )
502
        )
503

504
    merged_extras, main_digest = await concurrently(
1✔
505
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extra_sandbox_contents))),
506
        merge_digests(MergeDigests((dependencies_digest, run_request.digest))),
507
    )
508

509
    extra_env = dict(merged_extras.extra_env)
1✔
510

511
    append_only_caches = {
1✔
512
        **merged_extras.append_only_caches,
513
        **(request.named_caches or {}),
514
    }
515

516
    return ToolRunner(
1✔
517
        digest=main_digest,
518
        args=run_request.args + tuple(request.args),
519
        extra_env=FrozenDict(extra_env),
520
        extra_paths=merged_extras.paths,
521
        append_only_caches=FrozenDict(append_only_caches),
522
        immutable_input_digests=FrozenDict(merged_extras.immutable_input_digests),
523
    )
524

525

526
async def check_outputs(
3✔
527
    output_digest: Digest,
528
    output_files: Iterable[str],
529
    output_directories: Iterable[str],
530
    outputs_match_error_behavior: GlobMatchErrorBehavior,
531
    outputs_match_mode: GlobExpansionConjunction | None,
532
    address: Address,
533
) -> None:
534
    """Check an output digest from adhoc/shell backends to ensure that the outputs expected by the
535
    user do in fact exist."""
536

537
    if outputs_match_mode is None:
2✔
538
        return
1✔
539

540
    filtered_output_files_digests = await concurrently(
2✔
541
        digest_subset_to_digest(
542
            DigestSubset(
543
                output_digest,
544
                PathGlobs(
545
                    [output_file],
546
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
547
                ),
548
            ),
549
        )
550
        for output_file in output_files
551
    )
552

553
    filtered_output_directory_digests = await concurrently(
2✔
554
        digest_subset_to_digest(
555
            DigestSubset(
556
                output_digest,
557
                PathGlobs(
558
                    [output_directory, os.path.join(output_directory, "**")],
559
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
560
                ),
561
            ),
562
        )
563
        for output_directory in output_directories
564
    )
565

566
    filtered_output_files = tuple(zip(output_files, filtered_output_files_digests))
2✔
567
    filtered_output_directories = tuple(zip(output_directories, filtered_output_directory_digests))
2✔
568

569
    unused_output_files = tuple(
2✔
570
        f"{output_file} (from `output_files` field)"
571
        for output_file, digest in filtered_output_files
572
        if digest == EMPTY_DIGEST
573
    )
574
    unused_output_directories = tuple(
2✔
575
        f"{output_directory} (from `output_directories` field)"
576
        for output_directory, digest in filtered_output_directories
577
        if digest == EMPTY_DIGEST
578
    )
579

580
    def warn_or_raise(message: str, snapshot: Snapshot) -> None:
2✔
581
        unused_globs_str = ", ".join([*unused_output_files, *unused_output_directories])
1✔
582
        message = f"{message}\n\nThe following output globs were unused: {unused_globs_str}"
1✔
583

584
        if snapshot.dirs:
1✔
585
            message += f"\n\nDirectories in output ({len(snapshot.dirs)} total):"
1✔
586
            dirs = sorted(snapshot.dirs, key=lambda x: x.count(os.pathsep))
1✔
587
            if len(dirs) > 15:
1✔
588
                message += f" {', '.join(dirs[0:15])}, ... (trimmed for brevity)"
×
589
            else:
590
                message += f" {', '.join(dirs)}"
1✔
591

592
        if snapshot.files:
1✔
593
            message += f"\n\nFiles in output ({len(snapshot.files)} total):"
1✔
594
            files = sorted(snapshot.files, key=lambda x: x.count(os.pathsep))
1✔
595
            if len(files) > 15:
1✔
596
                message += f" {', '.join(files[0:15])}, ... (trimmed for brevity)"
×
597
            else:
598
                message += f" {', '.join(files)}"
1✔
599

600
        if outputs_match_error_behavior == GlobMatchErrorBehavior.error:
1✔
601
            raise ValueError(message)
1✔
602
        else:
603
            logger.warning(message)
×
604

605
    if outputs_match_mode == GlobExpansionConjunction.all_match:
2✔
606
        if not unused_output_files and not unused_output_directories:
2✔
607
            return
2✔
608

609
        snapshot, wrapped_tgt = await concurrently(
1✔
610
            digest_to_snapshot(output_digest),
611
            resolve_target(
612
                WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()
613
            ),
614
        )
615
        warn_or_raise(
1✔
616
            f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `all` "
617
            "which requires all output globs to actually match an output.",
618
            snapshot,
619
        )
620

621
    # Otherwise it is `GlobExpansionConjunction.any_match` which means only at least one glob must match.
622
    total_count = len(filtered_output_files) + len(filtered_output_directories)
1✔
623
    unused_count = len(unused_output_files) + len(unused_output_directories)
1✔
624
    if total_count == 0 or unused_count < total_count:
1✔
625
        return
1✔
626
    snapshot, wrapped_tgt = await concurrently(
×
627
        digest_to_snapshot(output_digest),
628
        resolve_target(WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()),
629
    )
630
    warn_or_raise(
×
631
        f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `any` "
632
        "which requires at least one output glob to actually match an output.",
633
        snapshot,
634
    )
635

636

637
@rule
3✔
638
async def run_prepared_adhoc_process(
3✔
639
    prepared_request: PreparedAdhocProcessRequest,
640
) -> FallibleAdhocProcessResult:
641
    request = prepared_request.original_request
2✔
642

643
    result = await execute_process(prepared_request.process, **implicitly())
2✔
644

645
    log_on_errors = request.log_on_process_errors or FrozenDict()
2✔
646
    error_to_log = log_on_errors.get(result.exit_code, None)
2✔
647
    if error_to_log:
2✔
UNCOV
648
        logger.error(error_to_log)
×
649

650
    if request.log_output:
2✔
UNCOV
651
        if result.stdout:
×
UNCOV
652
            logger.info(result.stdout.decode())
×
UNCOV
653
        if result.stderr:
×
UNCOV
654
            logger.warning(result.stderr.decode())
×
655

656
    working_directory = parse_relative_directory(request.working_directory, request.address)
2✔
657

658
    root_output_directory: str | None = None
2✔
659
    if request.use_working_directory_as_base_for_output_captures:
2✔
660
        root_output_directory = parse_relative_directory(
2✔
661
            request.root_output_directory, working_directory
662
        )
663

664
    extras = (
2✔
665
        (request.capture_stdout_file, result.stdout),
666
        (request.capture_stderr_file, result.stderr),
667
    )
668
    extra_contents = {i: j for i, j in extras if i}
2✔
669

670
    # Check the outputs (if configured) to ensure any required glob matches in fact occurred.
671
    output_digest = result.output_digest
2✔
672
    output_files: list[str] = list(request.output_files)
2✔
673
    output_directories: list[str] = list(request.output_directories)
2✔
674
    if request.use_working_directory_as_base_for_output_captures:
2✔
675
        output_files = [
2✔
676
            os.path.normpath(os.path.join(working_directory, of)) for of in output_files
677
        ]
678
        output_directories = [
2✔
679
            os.path.normpath(os.path.join(working_directory, od)) for od in output_directories
680
        ]
681
    await check_outputs(
2✔
682
        output_digest=output_digest,
683
        output_files=output_files,
684
        output_directories=output_directories,
685
        outputs_match_error_behavior=request.outputs_match_error_behavior,
686
        outputs_match_mode=request.outputs_match_conjunction,
687
        address=request.address,
688
    )
689

690
    if extra_contents:
2✔
691
        if request.use_working_directory_as_base_for_output_captures:
1✔
692
            extra_digest = await create_digest(
1✔
693
                CreateDigest(
694
                    FileContent(_parse_relative_file(name, working_directory), content)
695
                    for name, content in extra_contents.items()
696
                )
697
            )
698
        else:
699
            extra_digest = await create_digest(
1✔
700
                CreateDigest(FileContent(name, content) for name, content in extra_contents.items())
701
            )
702

703
        output_digest = await merge_digests(MergeDigests((output_digest, extra_digest)))
1✔
704

705
    adjusted: Digest = output_digest
2✔
706
    if root_output_directory is not None:
2✔
707
        adjusted = await remove_prefix(RemovePrefix(output_digest, root_output_directory))
2✔
708

709
    return FallibleAdhocProcessResult(
2✔
710
        process_result=result,
711
        adjusted_digest=adjusted,
712
        description=request.description,
713
    )
714

715

716
# Compute a stable bytes value for a `PathMetadata` consisting of the values to be hashed.
717
# Access time is not included to avoid having mere access to a file invalidating an execution.
718
def _path_metadata_to_bytes(m: PathMetadata | None) -> bytes:
3✔
719
    if m is None:
1✔
UNCOV
720
        return b""
×
721

722
    def dt_fmt(dt: datetime | None) -> str | None:
1✔
723
        if dt is not None:
1✔
724
            return dt.isoformat()
1✔
725
        return None
×
726

727
    d = {
1✔
728
        "path": m.path,
729
        "kind": str(m.kind),
730
        "length": m.length,
731
        "is_executable": m.is_executable,
732
        "unix_mode": m.unix_mode,
733
        "created": dt_fmt(m.created),
734
        "modified": dt_fmt(m.modified),
735
        "symlink_target": m.symlink_target,
736
    }
737

738
    return json.dumps(d, sort_keys=True).encode()
1✔
739

740

741
async def compute_workspace_invalidation_hash(path_globs: PathGlobs) -> str:
3✔
742
    raw_paths = await path_globs_to_paths(path_globs)
1✔
743
    paths = sorted([*raw_paths.files, *raw_paths.dirs])
1✔
744
    metadata_results = await concurrently(
1✔
745
        path_metadata_request(PathMetadataRequest(path)) for path in paths
746
    )
747

748
    # Compute a stable hash of all of the metadatas since the hash value should be stable
749
    # when used outside the process (for example, in the cache). (The `__hash__` dunder method
750
    # computes an unstable hash which can and does vary across different process invocations.)
751
    #
752
    # While it could be more of an intellectual correctness point than a necessity, It does matter,
753
    # however, for a single user to see the same behavior across process invocations if pantsd restarts.
754
    #
755
    # Note: This could probbaly use a non-cryptographic hash (e.g., Murmur), but that would require
756
    # a third party dependency.
757
    h = hashlib.sha256()
1✔
758
    for mr in metadata_results:
1✔
759
        h.update(_path_metadata_to_bytes(mr.metadata))
1✔
760
    return h.hexdigest()
1✔
761

762

763
@rule
3✔
764
async def prepare_adhoc_process(
3✔
765
    request: AdhocProcessRequest,
766
    bash: BashBinary,
767
) -> PreparedAdhocProcessRequest:
768
    description = request.description
2✔
769
    address = request.address
2✔
770
    working_directory = parse_relative_directory(request.working_directory or "", address)
2✔
771
    argv = request.argv
2✔
772
    timeout: int | None = request.timeout
2✔
773
    output_files = request.output_files
2✔
774
    output_directories = request.output_directories
2✔
775
    append_only_caches = request.append_only_caches or FrozenDict()
2✔
776
    immutable_input_digests = request.immutable_input_digests or FrozenDict()
2✔
777

778
    command_env: dict[str, str] = dict(request.env_vars)
2✔
779

780
    # Compute the hash for any workspace invalidation sources and put the hash into the environment as a dummy variable
781
    # so that the process produced by this rule will be invalidated if any of the referenced files change.
782
    if request.workspace_invalidation_globs is not None:
2✔
783
        workspace_invalidation_hash = await compute_workspace_invalidation_hash(
1✔
784
            request.workspace_invalidation_globs
785
        )
786
        command_env["__PANTS_WORKSPACE_INVALIDATION_SOURCES_HASH"] = workspace_invalidation_hash
1✔
787

788
    input_snapshot = await digest_to_snapshot(request.input_digest)
2✔
789

790
    if not working_directory or working_directory in input_snapshot.dirs:
2✔
791
        # Needed to ensure that underlying filesystem does not change during run
792
        work_dir = EMPTY_DIGEST
2✔
793
    else:
794
        work_dir = await create_digest(CreateDigest([Directory(working_directory)]))
1✔
795

796
    input_digest = await merge_digests(MergeDigests([request.input_digest, work_dir]))
2✔
797

798
    process = Process(
2✔
799
        argv=argv,
800
        description=f"Running {description}",
801
        env=command_env,
802
        input_digest=input_digest,
803
        output_directories=output_directories,
804
        output_files=output_files,
805
        timeout_seconds=timeout,
806
        working_directory=working_directory,
807
        append_only_caches=append_only_caches,
808
        immutable_input_digests=immutable_input_digests,
809
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
810
    )
811

812
    if request.use_working_directory_as_base_for_output_captures:
2✔
813
        process = _output_at_build_root(process, bash)
2✔
814

815
    return PreparedAdhocProcessRequest(
2✔
816
        original_request=request,
817
        process=process,
818
    )
819

820

821
class PathEnvModifyMode(Enum):
3✔
822
    """How the PATH environment variable should be augmented with extra path elements."""
823

824
    PREPEND = "prepend"
3✔
825
    APPEND = "append"
3✔
826
    OFF = "off"
3✔
827

828

829
async def prepare_env_vars(
3✔
830
    existing_env_vars: Mapping[str, str],
831
    env_vars_templates: tuple[str, ...],
832
    *,
833
    extra_paths: tuple[str, ...] = (),
834
    path_env_modify_mode: PathEnvModifyMode = PathEnvModifyMode.PREPEND,
835
    description_of_origin: str,
836
) -> FrozenDict[str, str]:
837
    env_vars: dict[str, str] = dict(existing_env_vars)
2✔
838

839
    to_fetch: set[str] = set()
2✔
840
    duplicate_keys: set[str] = set()
2✔
841
    for env_var in env_vars_templates:
2✔
842
        parts = env_var.split("=", 1)
1✔
843
        if parts[0] in env_vars:
1✔
844
            duplicate_keys.add(parts[0])
×
845

846
        if len(parts) == 2:
1✔
847
            env_vars[parts[0]] = parts[1]
1✔
848
        else:
UNCOV
849
            to_fetch.add(parts[0])
×
850

851
    if duplicate_keys:
2✔
852
        dups_as_str = ", ".join(sorted(duplicate_keys))
×
853
        raise ValueError(
×
854
            f"The following environment variables referenced in {description_of_origin} are defined multiple times: {dups_as_str}"
855
        )
856

857
    if to_fetch:
2✔
UNCOV
858
        fetched_env_vars = await environment_vars_subset(
×
859
            EnvironmentVarsRequest(tuple(sorted(to_fetch))), **implicitly()
860
        )
UNCOV
861
        env_vars.update(fetched_env_vars)
×
862

863
    def path_env_join(left: str | None, right: str | None) -> str | None:
2✔
864
        if not left and not right:
2✔
865
            return None
×
866
        if left and not right:
2✔
867
            return left
2✔
868
        if not left and right:
1✔
869
            return right
×
870
        return f"{left}:{right}"
1✔
871

872
    if extra_paths:
2✔
873
        existing_path_env = env_vars.get("PATH")
2✔
874
        extra_paths_as_str = ":".join(extra_paths)
2✔
875

876
        new_path_env: str | None = None
2✔
877
        if path_env_modify_mode == PathEnvModifyMode.PREPEND:
2✔
878
            new_path_env = path_env_join(extra_paths_as_str, existing_path_env)
2✔
879
        elif path_env_modify_mode == PathEnvModifyMode.APPEND:
1✔
880
            new_path_env = path_env_join(existing_path_env, extra_paths_as_str)
1✔
881

882
        if new_path_env:
2✔
883
            env_vars["PATH"] = new_path_env
2✔
884

885
    return FrozenDict(env_vars)
2✔
886

887

888
def _output_at_build_root(process: Process, bash: BashBinary) -> Process:
3✔
889
    working_directory = process.working_directory or ""
2✔
890

891
    output_directories = process.output_directories
2✔
892
    output_files = process.output_files
2✔
893
    if working_directory:
2✔
894
        output_directories = tuple(os.path.join(working_directory, d) for d in output_directories)
1✔
895
        output_files = tuple(os.path.join(working_directory, d) for d in output_files)
1✔
896

897
    cd = f"cd {shlex.quote(working_directory)} && " if working_directory else ""
2✔
898
    shlexed_argv = shlex.join(process.argv)
2✔
899
    new_argv = (bash.path, "-c", f"{cd}{shlexed_argv}")
2✔
900

901
    return dataclasses.replace(
2✔
902
        process,
903
        argv=new_argv,
904
        working_directory=None,
905
        output_directories=output_directories,
906
        output_files=output_files,
907
    )
908

909

910
def parse_relative_directory(workdir_in: str, relative_to: Address | str) -> str:
3✔
911
    """Convert the `workdir` field into something that can be understood by `Process`."""
912

913
    if isinstance(relative_to, Address):
2✔
914
        reldir = relative_to.spec_path
2✔
915
    else:
916
        reldir = relative_to
2✔
917

918
    if workdir_in == ".":
2✔
919
        return reldir
2✔
920
    elif workdir_in.startswith("./"):
2✔
921
        return os.path.join(reldir, workdir_in[2:])
1✔
922
    elif workdir_in.startswith("/"):
2✔
923
        return workdir_in[1:]
2✔
924
    else:
925
        return workdir_in
1✔
926

927

928
def _parse_relative_file(file_in: str, relative_to: str) -> str:
3✔
929
    """Convert the `capture_std..._file` fields into something that can be understood by
930
    `Process`."""
931

932
    if file_in.startswith("/"):
1✔
933
        return file_in[1:]
1✔
934

935
    return os.path.join(relative_to, file_in)
1✔
936

937

938
def rules():
3✔
939
    return (
2✔
940
        *collect_rules(),
941
        *process.rules(),
942
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc