• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25441711719

06 May 2026 02:31PM UTC coverage: 92.915%. Remained the same
25441711719

push

github

web-flow
use sha pin (with comment) format for generated actions (#23312)

Per the GitHub Action best practices we recently enabled at #23249, we
should pin each action to a SHA so that the reference is actually
immutable.

This will -- I hope -- knock out a large chunk of the 421 alerts we
currently get from zizmor. The next followup would then be upgrades and
harmonizing the generated and none-generated pins.

Notice: This idea was suggested by Claude while going over pinact output
and I was surprised to see that post processing the yaml wasn't too
gross.

92206 of 99237 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.7
/src/python/pants/core/util_rules/adhoc_process_support.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
11✔
4

5
import dataclasses
11✔
6
import hashlib
11✔
7
import json
11✔
8
import logging
11✔
9
import os
11✔
10
import shlex
11✔
11
from collections.abc import Iterable, Mapping
11✔
12
from dataclasses import dataclass
11✔
13
from datetime import datetime
11✔
14
from enum import Enum
11✔
15
from textwrap import dedent  # noqa: PNT20
11✔
16
from typing import TypeVar
11✔
17

18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
11✔
19
from pants.build_graph.address import Address
11✔
20
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
11✔
21
from pants.core.goals.package import (
11✔
22
    EnvironmentAwarePackageRequest,
23
    PackageFieldSet,
24
    environment_aware_package,
25
)
26
from pants.core.goals.run import RunFieldSet, generate_run_in_sandbox_request
11✔
27
from pants.core.target_types import FileSourceField
11✔
28
from pants.core.util_rules.env_vars import environment_vars_subset
11✔
29
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
11✔
30
from pants.core.util_rules.system_binaries import BashBinary
11✔
31
from pants.engine import process
11✔
32
from pants.engine.addresses import Addresses, UnparsedAddressInputs
11✔
33
from pants.engine.env_vars import EnvironmentVarsRequest
11✔
34
from pants.engine.environment import EnvironmentName
11✔
35
from pants.engine.fs import (
11✔
36
    EMPTY_DIGEST,
37
    CreateDigest,
38
    Digest,
39
    DigestSubset,
40
    Directory,
41
    FileContent,
42
    GlobExpansionConjunction,
43
    MergeDigests,
44
    PathGlobs,
45
    PathMetadataRequest,
46
    Snapshot,
47
)
48
from pants.engine.internals.build_files import resolve_address
11✔
49
from pants.engine.internals.graph import (
11✔
50
    find_valid_field_sets,
51
    resolve_target,
52
    resolve_targets,
53
    resolve_unparsed_address_inputs,
54
    transitive_targets,
55
)
56
from pants.engine.internals.native_engine import AddressInput, PathMetadata, RemovePrefix
11✔
57
from pants.engine.intrinsics import (
11✔
58
    create_digest,
59
    digest_subset_to_digest,
60
    digest_to_snapshot,
61
    execute_process,
62
    merge_digests,
63
    path_globs_to_paths,
64
    path_metadata_request,
65
    remove_prefix,
66
)
67
from pants.engine.process import (
11✔
68
    FallibleProcessResult,
69
    Process,
70
    ProcessCacheScope,
71
    ProcessResult,
72
    ProductDescription,
73
    fallible_to_exec_result_or_raise,
74
)
75
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
11✔
76
from pants.engine.target import (
11✔
77
    FieldSetsPerTargetRequest,
78
    InvalidFieldException,
79
    SourcesField,
80
    Target,
81
    TransitiveTargetsRequest,
82
    WrappedTargetRequest,
83
)
84
from pants.util.frozendict import FrozenDict
11✔
85

86
logger = logging.getLogger(__name__)
11✔
87

88

89
@dataclass(frozen=True)
11✔
90
class AdhocProcessRequest:
11✔
91
    description: str
11✔
92
    address: Address
11✔
93
    working_directory: str
11✔
94
    root_output_directory: str
11✔
95
    argv: tuple[str, ...]
11✔
96
    timeout: int | None
11✔
97
    input_digest: Digest
11✔
98
    immutable_input_digests: FrozenDict[str, Digest] | None
11✔
99
    append_only_caches: FrozenDict[str, str] | None
11✔
100
    output_files: tuple[str, ...]
11✔
101
    output_directories: tuple[str, ...]
11✔
102
    env_vars: FrozenDict[str, str]
11✔
103
    log_on_process_errors: FrozenDict[int, str] | None
11✔
104
    log_output: bool
11✔
105
    capture_stdout_file: str | None
11✔
106
    capture_stderr_file: str | None
11✔
107
    workspace_invalidation_globs: PathGlobs | None
11✔
108
    cache_scope: ProcessCacheScope | None = None
11✔
109
    use_working_directory_as_base_for_output_captures: bool = True
11✔
110
    outputs_match_error_behavior: GlobMatchErrorBehavior = GlobMatchErrorBehavior.error
11✔
111
    outputs_match_conjunction: GlobExpansionConjunction | None = GlobExpansionConjunction.all_match
11✔
112

113

114
@dataclass(frozen=True)
11✔
115
class PreparedAdhocProcessRequest:
11✔
116
    original_request: AdhocProcessRequest
11✔
117
    process: Process
11✔
118

119

120
@dataclass(frozen=True)
11✔
121
class FallibleAdhocProcessResult:
11✔
122
    process_result: FallibleProcessResult
11✔
123
    adjusted_digest: Digest
11✔
124
    description: str
11✔
125

126

127
@dataclass(frozen=True)
11✔
128
class AdhocProcessResult:
11✔
129
    process_result: ProcessResult
11✔
130
    adjusted_digest: Digest
11✔
131

132

133
@dataclass(frozen=True)
11✔
134
class ResolveExecutionDependenciesRequest:
11✔
135
    address: Address
11✔
136
    execution_dependencies: tuple[str, ...] | None
11✔
137
    runnable_dependencies: tuple[str, ...] | None
11✔
138

139

140
@dataclass(frozen=True)
11✔
141
class ResolvedExecutionDependencies:
11✔
142
    digest: Digest
11✔
143
    runnable_dependencies: RunnableDependencies | None
11✔
144

145

146
@dataclass(frozen=True)
11✔
147
class RunnableDependencies:
11✔
148
    path_component: str
11✔
149
    immutable_input_digests: Mapping[str, Digest]
11✔
150
    append_only_caches: Mapping[str, str]
11✔
151
    extra_env: Mapping[str, str]
11✔
152

153

154
@dataclass(frozen=True)
11✔
155
class ToolRunnerRequest:
11✔
156
    runnable_address_str: str
11✔
157
    args: tuple[str, ...]
11✔
158
    execution_dependencies: tuple[str, ...]
11✔
159
    runnable_dependencies: tuple[str, ...]
11✔
160
    target: Target
11✔
161
    runnable_address_field_alias: str
11✔
162
    named_caches: FrozenDict[str, str] | None = None
11✔
163

164

165
@dataclass(frozen=True)
11✔
166
class ToolRunner:
11✔
167
    digest: Digest
11✔
168
    args: tuple[str, ...]
11✔
169
    extra_env: FrozenDict[str, str]
11✔
170
    extra_paths: tuple[str, ...]
11✔
171
    append_only_caches: FrozenDict[str, str]
11✔
172
    immutable_input_digests: FrozenDict[str, Digest]
11✔
173

174

175
#
176
# Things that need a home
177
#
178

179

180
@dataclass(frozen=True)
11✔
181
class ExtraSandboxContents:
11✔
182
    digest: Digest
11✔
183
    paths: tuple[str, ...]
11✔
184
    immutable_input_digests: Mapping[str, Digest]
11✔
185
    append_only_caches: Mapping[str, str]
11✔
186
    extra_env: Mapping[str, str]
11✔
187

188

189
@dataclass(frozen=True)
11✔
190
class MergeExtraSandboxContents:
11✔
191
    additions: tuple[ExtraSandboxContents, ...]
11✔
192

193

194
@rule
11✔
195
async def merge_extra_sandbox_contents(request: MergeExtraSandboxContents) -> ExtraSandboxContents:
11✔
196
    additions = request.additions
6✔
197

198
    digests: list[Digest] = []
6✔
199
    paths: list[str] = []
6✔
200
    immutable_input_digests: dict[str, Digest] = {}
6✔
201
    append_only_caches: dict[str, str] = {}
6✔
202
    extra_env: dict[str, str] = {}
6✔
203

204
    for addition in additions:
6✔
205
        digests.append(addition.digest)
6✔
206
        if addition.paths:
6✔
207
            paths.extend(addition.paths)
6✔
208
        _safe_update(immutable_input_digests, addition.immutable_input_digests)
6✔
209
        _safe_update(append_only_caches, addition.append_only_caches)
6✔
210
        _safe_update(extra_env, addition.extra_env)
6✔
211

212
    digest = await merge_digests(MergeDigests(digests))
6✔
213

214
    return ExtraSandboxContents(
6✔
215
        digest=digest,
216
        paths=tuple(paths),
217
        immutable_input_digests=FrozenDict(immutable_input_digests),
218
        append_only_caches=FrozenDict(append_only_caches),
219
        extra_env=FrozenDict(extra_env),
220
    )
221

222

223
#
224
# END THINGS THAT NEED A HOME
225
#
226

227

228
@rule
11✔
229
async def convert_fallible_adhoc_process_result(
11✔
230
    fallible_result: FallibleAdhocProcessResult,
231
) -> AdhocProcessResult:
232
    result = await fallible_to_exec_result_or_raise(
5✔
233
        **implicitly(
234
            {
235
                fallible_result.process_result: FallibleProcessResult,
236
                ProductDescription(fallible_result.description): ProductDescription,
237
            }
238
        )
239
    )
240
    return AdhocProcessResult(
5✔
241
        process_result=result, adjusted_digest=fallible_result.adjusted_digest
242
    )
243

244

245
async def _resolve_runnable_dependencies(
11✔
246
    bash: BashBinary, deps: tuple[str, ...] | None, owning: Address, origin: str
247
) -> tuple[Digest, RunnableDependencies | None]:
248
    if not deps:
6✔
249
        return EMPTY_DIGEST, None
6✔
250

251
    addresses = await resolve_unparsed_address_inputs(
5✔
252
        UnparsedAddressInputs(
253
            (dep for dep in deps),
254
            owning_address=owning,
255
            description_of_origin=origin,
256
        ),
257
        **implicitly(),
258
    )
259

260
    targets = await resolve_targets(**implicitly({addresses: Addresses}))
5✔
261
    fspt = await find_valid_field_sets(
5✔
262
        FieldSetsPerTargetRequest(RunFieldSet, targets), **implicitly()
263
    )
264

265
    for address, field_set in zip(addresses, fspt.collection):
5✔
266
        if not field_set:
5✔
267
            raise ValueError(
×
268
                dedent(
269
                    f"""\
270
                    Address `{address.spec}` was specified as a runnable dependency, but is not
271
                    runnable.
272
                    """
273
                )
274
            )
275

276
    runnables = await concurrently(
5✔
277
        generate_run_in_sandbox_request(**implicitly({field_set[0]: RunFieldSet}))
278
        for field_set in fspt.collection
279
    )
280

281
    shims: list[FileContent] = []
5✔
282
    extras: list[ExtraSandboxContents] = []
5✔
283

284
    for address, runnable in zip(addresses, runnables):
5✔
285
        extras.append(
5✔
286
            ExtraSandboxContents(
287
                digest=runnable.digest,
288
                paths=(),
289
                immutable_input_digests=FrozenDict(runnable.immutable_input_digests or {}),
290
                append_only_caches=FrozenDict(runnable.append_only_caches or {}),
291
                extra_env=FrozenDict(),
292
            )
293
        )
294
        shims.append(
5✔
295
            FileContent(
296
                address.target_name,
297
                _runnable_dependency_shim(bash.path, runnable.args, runnable.extra_env),
298
                is_executable=True,
299
            )
300
        )
301

302
    merged_extras, shim_digest = await concurrently(
5✔
303
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extras))),
304
        create_digest(CreateDigest(shims)),
305
    )
306

307
    shim_digest_path = f"_runnable_dependency_shims_{shim_digest.fingerprint}"
5✔
308
    immutable_input_digests = {shim_digest_path: shim_digest}
5✔
309
    _safe_update(immutable_input_digests, merged_extras.immutable_input_digests)
5✔
310

311
    return (
5✔
312
        merged_extras.digest,
313
        RunnableDependencies(
314
            shim_digest_path,
315
            FrozenDict(immutable_input_digests),
316
            merged_extras.append_only_caches,
317
            FrozenDict({"_PANTS_SHIM_ROOT": "{chroot}"}),
318
        ),
319
    )
320

321

322
@rule
11✔
323
async def resolve_execution_environment(
11✔
324
    request: ResolveExecutionDependenciesRequest,
325
    bash: BashBinary,
326
) -> ResolvedExecutionDependencies:
327
    target_address = request.address
6✔
328
    raw_execution_dependencies = request.execution_dependencies
6✔
329

330
    # Always include the execution dependencies that were specified
331
    if raw_execution_dependencies is not None:
6✔
332
        _descr = f"the `execution_dependencies` from the target {target_address}"
6✔
333
        execution_dependencies = await resolve_unparsed_address_inputs(
6✔
334
            UnparsedAddressInputs(
335
                raw_execution_dependencies,
336
                owning_address=target_address,
337
                description_of_origin=_descr,
338
            ),
339
            **implicitly(),
340
        )
341
    else:
342
        execution_dependencies = Addresses(())
5✔
343

344
    transitive = await transitive_targets(
6✔
345
        TransitiveTargetsRequest(execution_dependencies), **implicitly()
346
    )
347

348
    all_dependencies = (
6✔
349
        *(i for i in transitive.roots if i.address is not target_address),
350
        *transitive.dependencies,
351
    )
352

353
    sources, pkgs_per_target = await concurrently(
6✔
354
        determine_source_files(
355
            SourceFilesRequest(
356
                sources_fields=[tgt.get(SourcesField) for tgt in all_dependencies],
357
                for_sources_types=(SourcesField, FileSourceField),
358
                enable_codegen=True,
359
            )
360
        ),
361
        find_valid_field_sets(
362
            FieldSetsPerTargetRequest(PackageFieldSet, all_dependencies), **implicitly()
363
        ),
364
    )
365

366
    packages = await concurrently(
6✔
367
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
368
        for field_set in pkgs_per_target.field_sets
369
    )
370

371
    _descr = f"the `runnable_dependencies` from the target {target_address}"
6✔
372
    runnables_digest, runnable_dependencies = await _resolve_runnable_dependencies(
6✔
373
        bash, request.runnable_dependencies, target_address, _descr
374
    )
375

376
    dependencies_digest = await merge_digests(
6✔
377
        MergeDigests([sources.snapshot.digest, runnables_digest, *(pkg.digest for pkg in packages)])
378
    )
379

380
    return ResolvedExecutionDependencies(dependencies_digest, runnable_dependencies)
6✔
381

382

383
K = TypeVar("K")
11✔
384
V = TypeVar("V")
11✔
385

386

387
def _safe_update(d1: dict[K, V], d2: Mapping[K, V]) -> dict[K, V]:
11✔
388
    """Updates `d1` with the values from `d2`, raising an exception if a key exists in both
389
    dictionaries, but with a different value."""
390

391
    for k, v in d2.items():
6✔
392
        if k in d1 and d1[k] != v:
6✔
393
            raise ValueError(f"Key {k} was specified in both dictionaries with different values.")
×
394
        d1[k] = v
6✔
395
    return d1
6✔
396

397

398
def _runnable_dependency_shim(
11✔
399
    bash: str, args: Iterable[str], extra_env: Mapping[str, str]
400
) -> bytes:
401
    """The binary shim script to be placed in the output directory for the digest."""
402

403
    def _quote(s: str) -> str:
5✔
404
        quoted = shlex.quote(s)
5✔
405
        return quoted.replace("{chroot}", "'${_PANTS_SHIM_ROOT}'")
5✔
406

407
    binary = " ".join(_quote(arg) for arg in args)
5✔
408
    env_str = "\n".join(
5✔
409
        f"export {shlex.quote(key)}={_quote(value)}" for (key, value) in extra_env.items()
410
    )
411
    return dedent(
5✔
412
        f"""\
413
        #!{bash}
414
        {env_str}
415
        exec {binary} "$@"
416
        """
417
    ).encode()
418

419

420
@rule
11✔
421
async def create_tool_runner(
11✔
422
    request: ToolRunnerRequest,
423
) -> ToolRunner:
424
    runnable_address = await resolve_address(
2✔
425
        **implicitly(
426
            {
427
                AddressInput.parse(
428
                    request.runnable_address_str,
429
                    relative_to=request.target.address.spec_path,
430
                    description_of_origin=f"Runnable target for {request.target.address.spec_path}",
431
                ): AddressInput
432
            }
433
        )
434
    )
435

436
    addresses = Addresses((runnable_address,))
2✔
437
    addresses.expect_single()
2✔
438

439
    runnable_targets = await resolve_targets(**implicitly({addresses: Addresses}))
2✔
440

441
    run_field_sets, environment_name = await concurrently(
2✔
442
        find_valid_field_sets(
443
            FieldSetsPerTargetRequest(RunFieldSet, runnable_targets), **implicitly()
444
        ),
445
        resolve_environment_name(
446
            EnvironmentNameRequest.from_target(request.target), **implicitly()
447
        ),
448
    )
449

450
    if not run_field_sets.field_sets:
2✔
451
        raise InvalidFieldException(
×
452
            f"The `{request.runnable_address_field_alias}` field in target `{request.target.address}` must be set to the "
453
            f"address of a target which can be run by the `run` goal. Instead, the `{request.runnable_address_field_alias}` field is "
454
            f"set to `{request.runnable_address_str}` which refers to the `{runnable_targets[0].alias}` target "
455
            f"at `{runnable_targets[0].address}` that cannot be run by the `run` goal."
456
        )
457

458
    run_field_set: RunFieldSet = run_field_sets.field_sets[0]
2✔
459

460
    req = ResolveExecutionDependenciesRequest(
2✔
461
        address=request.target.address,
462
        execution_dependencies=request.execution_dependencies,
463
        runnable_dependencies=request.runnable_dependencies,
464
    )
465
    execution_environment = await resolve_execution_environment(
2✔
466
        **implicitly({req: ResolveExecutionDependenciesRequest, environment_name: EnvironmentName}),
467
    )
468

469
    # Must be run in target environment so that the binaries/envvars match the execution
470
    # environment when we actually run the process.
471
    run_request = await generate_run_in_sandbox_request(
2✔
472
        **implicitly({run_field_set: RunFieldSet, environment_name: EnvironmentName})
473
    )
474

475
    dependencies_digest = execution_environment.digest
2✔
476
    runnable_dependencies = execution_environment.runnable_dependencies
2✔
477

478
    extra_env: dict[str, str] = dict(run_request.extra_env or {})
2✔
479
    extra_path = extra_env.pop("PATH", None)
2✔
480

481
    extra_sandbox_contents = []
2✔
482

483
    extra_sandbox_contents.append(
2✔
484
        ExtraSandboxContents(
485
            digest=EMPTY_DIGEST,
486
            paths=(extra_path,) if extra_path else (),
487
            immutable_input_digests=run_request.immutable_input_digests or FrozenDict(),
488
            append_only_caches=run_request.append_only_caches or FrozenDict(),
489
            extra_env=run_request.extra_env or FrozenDict(),
490
        )
491
    )
492

493
    if runnable_dependencies:
2✔
494
        extra_sandbox_contents.append(
2✔
495
            ExtraSandboxContents(
496
                digest=EMPTY_DIGEST,
497
                paths=(f"{{chroot}}/{runnable_dependencies.path_component}",),
498
                immutable_input_digests=runnable_dependencies.immutable_input_digests,
499
                append_only_caches=runnable_dependencies.append_only_caches,
500
                extra_env=runnable_dependencies.extra_env,
501
            )
502
        )
503

504
    merged_extras, main_digest = await concurrently(
2✔
505
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extra_sandbox_contents))),
506
        merge_digests(MergeDigests((dependencies_digest, run_request.digest))),
507
    )
508

509
    extra_env = dict(merged_extras.extra_env)
2✔
510

511
    append_only_caches = {
2✔
512
        **merged_extras.append_only_caches,
513
        **(request.named_caches or {}),
514
    }
515

516
    return ToolRunner(
2✔
517
        digest=main_digest,
518
        args=run_request.args + tuple(request.args),
519
        extra_env=FrozenDict(extra_env),
520
        extra_paths=merged_extras.paths,
521
        append_only_caches=FrozenDict(append_only_caches),
522
        immutable_input_digests=FrozenDict(merged_extras.immutable_input_digests),
523
    )
524

525

526
async def check_outputs(
11✔
527
    output_digest: Digest,
528
    output_files: Iterable[str],
529
    output_directories: Iterable[str],
530
    outputs_match_error_behavior: GlobMatchErrorBehavior,
531
    outputs_match_mode: GlobExpansionConjunction | None,
532
    address: Address,
533
) -> None:
534
    """Check an output digest from adhoc/shell backends to ensure that the outputs expected by the
535
    user do in fact exist."""
536

537
    if outputs_match_mode is None:
6✔
538
        return
4✔
539

540
    filtered_output_files_digests = await concurrently(
6✔
541
        digest_subset_to_digest(
542
            DigestSubset(
543
                output_digest,
544
                PathGlobs(
545
                    [output_file],
546
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
547
                ),
548
            ),
549
        )
550
        for output_file in output_files
551
    )
552

553
    filtered_output_directory_digests = await concurrently(
6✔
554
        digest_subset_to_digest(
555
            DigestSubset(
556
                output_digest,
557
                PathGlobs(
558
                    [output_directory, os.path.join(output_directory, "**")],
559
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
560
                ),
561
            ),
562
        )
563
        for output_directory in output_directories
564
    )
565

566
    filtered_output_files = tuple(zip(output_files, filtered_output_files_digests))
6✔
567
    filtered_output_directories = tuple(zip(output_directories, filtered_output_directory_digests))
6✔
568

569
    unused_output_files = tuple(
6✔
570
        f"{output_file} (from `output_files` field)"
571
        for output_file, digest in filtered_output_files
572
        if digest == EMPTY_DIGEST
573
    )
574
    unused_output_directories = tuple(
6✔
575
        f"{output_directory} (from `output_directories` field)"
576
        for output_directory, digest in filtered_output_directories
577
        if digest == EMPTY_DIGEST
578
    )
579

580
    def warn_or_raise(message: str, snapshot: Snapshot) -> None:
6✔
581
        unused_globs_str = ", ".join([*unused_output_files, *unused_output_directories])
4✔
582
        message = f"{message}\n\nThe following output globs were unused: {unused_globs_str}"
4✔
583

584
        if snapshot.dirs:
4✔
585
            message += f"\n\nDirectories in output ({len(snapshot.dirs)} total):"
3✔
586
            dirs = sorted(snapshot.dirs, key=lambda x: x.count(os.pathsep))
3✔
587
            if len(dirs) > 15:
3✔
588
                message += f" {', '.join(dirs[0:15])}, ... (trimmed for brevity)"
×
589
            else:
590
                message += f" {', '.join(dirs)}"
3✔
591

592
        if snapshot.files:
4✔
593
            message += f"\n\nFiles in output ({len(snapshot.files)} total):"
3✔
594
            files = sorted(snapshot.files, key=lambda x: x.count(os.pathsep))
3✔
595
            if len(files) > 15:
3✔
596
                message += f" {', '.join(files[0:15])}, ... (trimmed for brevity)"
×
597
            else:
598
                message += f" {', '.join(files)}"
3✔
599

600
        if outputs_match_error_behavior == GlobMatchErrorBehavior.error:
4✔
601
            raise ValueError(message)
4✔
602
        else:
603
            logger.warning(message)
1✔
604

605
    if outputs_match_mode == GlobExpansionConjunction.all_match:
6✔
606
        if not unused_output_files and not unused_output_directories:
6✔
607
            return
6✔
608

609
        snapshot, wrapped_tgt = await concurrently(
4✔
610
            digest_to_snapshot(output_digest),
611
            resolve_target(
612
                WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()
613
            ),
614
        )
615
        warn_or_raise(
4✔
616
            f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `all` "
617
            "which requires all output globs to actually match an output.",
618
            snapshot,
619
        )
620
        # Should return otherwise it silently falls through and warns twice
621
        return
1✔
622

623
    # Otherwise it is `GlobExpansionConjunction.any_match` which means only at least one glob must match.
624
    total_count = len(filtered_output_files) + len(filtered_output_directories)
3✔
625
    unused_count = len(unused_output_files) + len(unused_output_directories)
3✔
626
    if total_count == 0 or unused_count < total_count:
3✔
627
        return
3✔
628
    snapshot, wrapped_tgt = await concurrently(
×
629
        digest_to_snapshot(output_digest),
630
        resolve_target(WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()),
631
    )
632
    warn_or_raise(
×
633
        f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `any` "
634
        "which requires at least one output glob to actually match an output.",
635
        snapshot,
636
    )
637

638

639
@rule
11✔
640
async def run_prepared_adhoc_process(
11✔
641
    prepared_request: PreparedAdhocProcessRequest,
642
) -> FallibleAdhocProcessResult:
643
    request = prepared_request.original_request
5✔
644

645
    result = await execute_process(prepared_request.process, **implicitly())
5✔
646

647
    log_on_errors = request.log_on_process_errors or FrozenDict()
5✔
648
    error_to_log = log_on_errors.get(result.exit_code, None)
5✔
649
    if error_to_log:
5✔
650
        logger.error(error_to_log)
1✔
651

652
    if request.log_output:
5✔
653
        if result.stdout:
1✔
654
            logger.info(result.stdout.decode())
1✔
655
        if result.stderr:
1✔
656
            logger.warning(result.stderr.decode())
1✔
657

658
    working_directory = parse_relative_directory(request.working_directory, request.address)
5✔
659

660
    root_output_directory: str | None = None
5✔
661
    if request.use_working_directory_as_base_for_output_captures:
5✔
662
        root_output_directory = parse_relative_directory(
5✔
663
            request.root_output_directory, working_directory
664
        )
665

666
    extras = (
5✔
667
        (request.capture_stdout_file, result.stdout),
668
        (request.capture_stderr_file, result.stderr),
669
    )
670
    extra_contents = {i: j for i, j in extras if i}
5✔
671

672
    # Check the outputs (if configured) to ensure any required glob matches in fact occurred.
673
    output_digest = result.output_digest
5✔
674
    output_files: list[str] = list(request.output_files)
5✔
675
    output_directories: list[str] = list(request.output_directories)
5✔
676
    if request.use_working_directory_as_base_for_output_captures:
5✔
677
        output_files = [
5✔
678
            os.path.normpath(os.path.join(working_directory, of)) for of in output_files
679
        ]
680
        output_directories = [
5✔
681
            os.path.normpath(os.path.join(working_directory, od)) for od in output_directories
682
        ]
683
    await check_outputs(
5✔
684
        output_digest=output_digest,
685
        output_files=output_files,
686
        output_directories=output_directories,
687
        outputs_match_error_behavior=request.outputs_match_error_behavior,
688
        outputs_match_mode=request.outputs_match_conjunction,
689
        address=request.address,
690
    )
691

692
    if extra_contents:
5✔
693
        if request.use_working_directory_as_base_for_output_captures:
1✔
694
            extra_digest = await create_digest(
1✔
695
                CreateDigest(
696
                    FileContent(_parse_relative_file(name, working_directory), content)
697
                    for name, content in extra_contents.items()
698
                )
699
            )
700
        else:
701
            extra_digest = await create_digest(
1✔
702
                CreateDigest(FileContent(name, content) for name, content in extra_contents.items())
703
            )
704

705
        output_digest = await merge_digests(MergeDigests((output_digest, extra_digest)))
1✔
706

707
    adjusted: Digest = output_digest
5✔
708
    if root_output_directory is not None:
5✔
709
        adjusted = await remove_prefix(RemovePrefix(output_digest, root_output_directory))
5✔
710

711
    return FallibleAdhocProcessResult(
5✔
712
        process_result=result,
713
        adjusted_digest=adjusted,
714
        description=request.description,
715
    )
716

717

718
# Compute a stable bytes value for a `PathMetadata` consisting of the values to be hashed.
719
# Access time is not included to avoid having mere access to a file invalidating an execution.
720
def _path_metadata_to_bytes(m: PathMetadata | None) -> bytes:
11✔
721
    if m is None:
3✔
722
        return b""
1✔
723

724
    def dt_fmt(dt: datetime | None) -> str | None:
3✔
725
        if dt is not None:
3✔
726
            return dt.isoformat()
3✔
727
        return None
×
728

729
    d = {
3✔
730
        "path": m.path,
731
        "kind": str(m.kind),
732
        "length": m.length,
733
        "is_executable": m.is_executable,
734
        "unix_mode": m.unix_mode,
735
        "created": dt_fmt(m.created),
736
        "modified": dt_fmt(m.modified),
737
        "symlink_target": m.symlink_target,
738
    }
739

740
    return json.dumps(d, sort_keys=True).encode()
3✔
741

742

743
async def compute_workspace_invalidation_hash(path_globs: PathGlobs) -> str:
11✔
744
    raw_paths = await path_globs_to_paths(path_globs)
2✔
745
    paths = sorted([*raw_paths.files, *raw_paths.dirs])
2✔
746
    metadata_results = await concurrently(
2✔
747
        path_metadata_request(PathMetadataRequest(path)) for path in paths
748
    )
749

750
    # Compute a stable hash of all of the metadatas since the hash value should be stable
751
    # when used outside the process (for example, in the cache). (The `__hash__` dunder method
752
    # computes an unstable hash which can and does vary across different process invocations.)
753
    #
754
    # While it could be more of an intellectual correctness point than a necessity, It does matter,
755
    # however, for a single user to see the same behavior across process invocations if pantsd restarts.
756
    #
757
    # Note: This could probbaly use a non-cryptographic hash (e.g., Murmur), but that would require
758
    # a third party dependency.
759
    h = hashlib.sha256()
2✔
760
    for mr in metadata_results:
2✔
761
        h.update(_path_metadata_to_bytes(mr.metadata))
2✔
762
    return h.hexdigest()
2✔
763

764

765
@rule
11✔
766
async def prepare_adhoc_process(
11✔
767
    request: AdhocProcessRequest,
768
    bash: BashBinary,
769
) -> PreparedAdhocProcessRequest:
770
    description = request.description
5✔
771
    address = request.address
5✔
772
    working_directory = parse_relative_directory(request.working_directory or "", address)
5✔
773
    argv = request.argv
5✔
774
    timeout: int | None = request.timeout
5✔
775
    output_files = request.output_files
5✔
776
    output_directories = request.output_directories
5✔
777
    append_only_caches = request.append_only_caches or FrozenDict()
5✔
778
    immutable_input_digests = request.immutable_input_digests or FrozenDict()
5✔
779

780
    command_env: dict[str, str] = dict(request.env_vars)
5✔
781

782
    # Compute the hash for any workspace invalidation sources and put the hash into the environment as a dummy variable
783
    # so that the process produced by this rule will be invalidated if any of the referenced files change.
784
    if request.workspace_invalidation_globs is not None:
5✔
785
        workspace_invalidation_hash = await compute_workspace_invalidation_hash(
2✔
786
            request.workspace_invalidation_globs
787
        )
788
        command_env["__PANTS_WORKSPACE_INVALIDATION_SOURCES_HASH"] = workspace_invalidation_hash
2✔
789

790
    input_snapshot = await digest_to_snapshot(request.input_digest)
5✔
791

792
    if not working_directory or working_directory in input_snapshot.dirs:
5✔
793
        # Needed to ensure that underlying filesystem does not change during run
794
        work_dir = EMPTY_DIGEST
5✔
795
    else:
796
        work_dir = await create_digest(CreateDigest([Directory(working_directory)]))
3✔
797

798
    input_digest = await merge_digests(MergeDigests([request.input_digest, work_dir]))
5✔
799

800
    process = Process(
5✔
801
        argv=argv,
802
        description=f"Running {description}",
803
        env=command_env,
804
        input_digest=input_digest,
805
        output_directories=output_directories,
806
        output_files=output_files,
807
        timeout_seconds=timeout,
808
        working_directory=working_directory,
809
        append_only_caches=append_only_caches,
810
        immutable_input_digests=immutable_input_digests,
811
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
812
    )
813

814
    if request.use_working_directory_as_base_for_output_captures:
5✔
815
        process = _output_at_build_root(process, bash)
5✔
816

817
    return PreparedAdhocProcessRequest(
5✔
818
        original_request=request,
819
        process=process,
820
    )
821

822

823
class PathEnvModifyMode(Enum):
11✔
824
    """How the PATH environment variable should be augmented with extra path elements."""
825

826
    PREPEND = "prepend"
11✔
827
    APPEND = "append"
11✔
828
    OFF = "off"
11✔
829

830

831
async def prepare_env_vars(
11✔
832
    existing_env_vars: Mapping[str, str],
833
    env_vars_templates: tuple[str, ...],
834
    *,
835
    extra_paths: tuple[str, ...] = (),
836
    path_env_modify_mode: PathEnvModifyMode = PathEnvModifyMode.PREPEND,
837
    description_of_origin: str,
838
) -> FrozenDict[str, str]:
839
    env_vars: dict[str, str] = dict(existing_env_vars)
6✔
840

841
    to_fetch: set[str] = set()
6✔
842
    duplicate_keys: set[str] = set()
6✔
843
    for env_var in env_vars_templates:
6✔
844
        parts = env_var.split("=", 1)
2✔
845
        if parts[0] in env_vars:
2✔
846
            duplicate_keys.add(parts[0])
×
847

848
        if len(parts) == 2:
2✔
849
            env_vars[parts[0]] = parts[1]
2✔
850
        else:
851
            to_fetch.add(parts[0])
1✔
852

853
    if duplicate_keys:
6✔
854
        dups_as_str = ", ".join(sorted(duplicate_keys))
×
855
        raise ValueError(
×
856
            f"The following environment variables referenced in {description_of_origin} are defined multiple times: {dups_as_str}"
857
        )
858

859
    if to_fetch:
6✔
860
        fetched_env_vars = await environment_vars_subset(
1✔
861
            EnvironmentVarsRequest(tuple(sorted(to_fetch))), **implicitly()
862
        )
863
        env_vars.update(fetched_env_vars)
1✔
864

865
    def path_env_join(left: str | None, right: str | None) -> str | None:
6✔
866
        if not left and not right:
6✔
867
            return None
×
868
        if left and not right:
6✔
869
            return left
6✔
870
        if not left and right:
2✔
871
            return right
×
872
        return f"{left}:{right}"
2✔
873

874
    if extra_paths:
6✔
875
        existing_path_env = env_vars.get("PATH")
6✔
876
        extra_paths_as_str = ":".join(extra_paths)
6✔
877

878
        new_path_env: str | None = None
6✔
879
        if path_env_modify_mode == PathEnvModifyMode.PREPEND:
6✔
880
            new_path_env = path_env_join(extra_paths_as_str, existing_path_env)
6✔
881
        elif path_env_modify_mode == PathEnvModifyMode.APPEND:
2✔
882
            new_path_env = path_env_join(existing_path_env, extra_paths_as_str)
2✔
883

884
        if new_path_env:
6✔
885
            env_vars["PATH"] = new_path_env
6✔
886

887
    return FrozenDict(env_vars)
6✔
888

889

890
def _output_at_build_root(process: Process, bash: BashBinary) -> Process:
11✔
891
    working_directory = process.working_directory or ""
5✔
892

893
    output_directories = process.output_directories
5✔
894
    output_files = process.output_files
5✔
895
    if working_directory:
5✔
896
        output_directories = tuple(os.path.join(working_directory, d) for d in output_directories)
3✔
897
        output_files = tuple(os.path.join(working_directory, d) for d in output_files)
3✔
898

899
    cd = f"cd {shlex.quote(working_directory)} && " if working_directory else ""
5✔
900
    shlexed_argv = shlex.join(process.argv)
5✔
901
    new_argv = (bash.path, "-c", f"{cd}{shlexed_argv}")
5✔
902

903
    return dataclasses.replace(
5✔
904
        process,
905
        argv=new_argv,
906
        working_directory=None,
907
        output_directories=output_directories,
908
        output_files=output_files,
909
    )
910

911

912
def parse_relative_directory(workdir_in: str, relative_to: Address | str) -> str:
11✔
913
    """Convert the `workdir` field into something that can be understood by `Process`."""
914

915
    if isinstance(relative_to, Address):
5✔
916
        reldir = relative_to.spec_path
5✔
917
    else:
918
        reldir = relative_to
5✔
919

920
    if workdir_in == ".":
5✔
921
        return reldir
5✔
922
    elif workdir_in.startswith("./"):
5✔
923
        return os.path.join(reldir, workdir_in[2:])
2✔
924
    elif workdir_in.startswith("/"):
5✔
925
        return workdir_in[1:]
5✔
926
    else:
927
        return workdir_in
2✔
928

929

930
def _parse_relative_file(file_in: str, relative_to: str) -> str:
11✔
931
    """Convert the `capture_std..._file` fields into something that can be understood by
932
    `Process`."""
933

934
    if file_in.startswith("/"):
1✔
935
        return file_in[1:]
1✔
936

937
    return os.path.join(relative_to, file_in)
1✔
938

939

940
def rules():
11✔
941
    return (
7✔
942
        *collect_rules(),
943
        *process.rules(),
944
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc