• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26260209689

21 May 2026 11:59PM UTC coverage: 75.453% (-15.7%) from 91.156%
26260209689

Pull #23365

github

web-flow
Merge 5fe873b58 into 7ea655ba0
Pull Request #23365: uv.lock -> pex optimization

5 of 16 new or added lines in 1 file covered. (31.25%)

10118 existing lines in 378 files now uncovered.

54669 of 72454 relevant lines covered (75.45%)

2.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.91
/src/python/pants/core/util_rules/adhoc_process_support.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
4✔
4

5
import dataclasses
4✔
6
import hashlib
4✔
7
import json
4✔
8
import logging
4✔
9
import os
4✔
10
import shlex
4✔
11
from collections.abc import Iterable, Mapping
4✔
12
from dataclasses import dataclass
4✔
13
from datetime import datetime
4✔
14
from enum import Enum
4✔
15
from textwrap import dedent  # noqa: PNT20
4✔
16
from typing import TypeVar
4✔
17

18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
4✔
19
from pants.build_graph.address import Address
4✔
20
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
4✔
21
from pants.core.goals.package import (
4✔
22
    EnvironmentAwarePackageRequest,
23
    PackageFieldSet,
24
    environment_aware_package,
25
)
26
from pants.core.goals.run import RunFieldSet, generate_run_in_sandbox_request
4✔
27
from pants.core.target_types import FileSourceField
4✔
28
from pants.core.util_rules.env_vars import environment_vars_subset
4✔
29
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
4✔
30
from pants.core.util_rules.system_binaries import BashBinary
4✔
31
from pants.engine import process
4✔
32
from pants.engine.addresses import Addresses, UnparsedAddressInputs
4✔
33
from pants.engine.env_vars import EnvironmentVarsRequest
4✔
34
from pants.engine.environment import EnvironmentName
4✔
35
from pants.engine.fs import (
4✔
36
    EMPTY_DIGEST,
37
    CreateDigest,
38
    Digest,
39
    DigestSubset,
40
    Directory,
41
    FileContent,
42
    GlobExpansionConjunction,
43
    MergeDigests,
44
    PathGlobs,
45
    PathMetadataRequest,
46
    Snapshot,
47
)
48
from pants.engine.internals.build_files import resolve_address
4✔
49
from pants.engine.internals.graph import (
4✔
50
    find_valid_field_sets,
51
    resolve_target,
52
    resolve_targets,
53
    resolve_unparsed_address_inputs,
54
    transitive_targets,
55
)
56
from pants.engine.internals.native_engine import AddressInput, PathMetadata, RemovePrefix
4✔
57
from pants.engine.intrinsics import (
4✔
58
    create_digest,
59
    digest_subset_to_digest,
60
    digest_to_snapshot,
61
    execute_process,
62
    merge_digests,
63
    path_globs_to_paths,
64
    path_metadata_request,
65
    remove_prefix,
66
)
67
from pants.engine.process import (
4✔
68
    FallibleProcessResult,
69
    Process,
70
    ProcessCacheScope,
71
    ProcessResult,
72
    ProductDescription,
73
    fallible_to_exec_result_or_raise,
74
)
75
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
4✔
76
from pants.engine.target import (
4✔
77
    FieldSetsPerTargetRequest,
78
    InvalidFieldException,
79
    SourcesField,
80
    Target,
81
    TransitiveTargetsRequest,
82
    WrappedTargetRequest,
83
)
84
from pants.util.frozendict import FrozenDict
4✔
85

86
logger = logging.getLogger(__name__)
4✔
87

88

89
@dataclass(frozen=True)
4✔
90
class AdhocProcessRequest:
4✔
91
    description: str
4✔
92
    address: Address
4✔
93
    working_directory: str
4✔
94
    root_output_directory: str
4✔
95
    argv: tuple[str, ...]
4✔
96
    timeout: int | None
4✔
97
    input_digest: Digest
4✔
98
    immutable_input_digests: FrozenDict[str, Digest] | None
4✔
99
    append_only_caches: FrozenDict[str, str] | None
4✔
100
    output_files: tuple[str, ...]
4✔
101
    output_directories: tuple[str, ...]
4✔
102
    env_vars: FrozenDict[str, str]
4✔
103
    log_on_process_errors: FrozenDict[int, str] | None
4✔
104
    log_output: bool
4✔
105
    capture_stdout_file: str | None
4✔
106
    capture_stderr_file: str | None
4✔
107
    workspace_invalidation_globs: PathGlobs | None
4✔
108
    cache_scope: ProcessCacheScope | None = None
4✔
109
    use_working_directory_as_base_for_output_captures: bool = True
4✔
110
    outputs_match_error_behavior: GlobMatchErrorBehavior = GlobMatchErrorBehavior.error
4✔
111
    outputs_match_conjunction: GlobExpansionConjunction | None = GlobExpansionConjunction.all_match
4✔
112

113

114
@dataclass(frozen=True)
4✔
115
class PreparedAdhocProcessRequest:
4✔
116
    original_request: AdhocProcessRequest
4✔
117
    process: Process
4✔
118

119

120
@dataclass(frozen=True)
4✔
121
class FallibleAdhocProcessResult:
4✔
122
    process_result: FallibleProcessResult
4✔
123
    adjusted_digest: Digest
4✔
124
    description: str
4✔
125

126

127
@dataclass(frozen=True)
4✔
128
class AdhocProcessResult:
4✔
129
    process_result: ProcessResult
4✔
130
    adjusted_digest: Digest
4✔
131

132

133
@dataclass(frozen=True)
4✔
134
class ResolveExecutionDependenciesRequest:
4✔
135
    address: Address
4✔
136
    execution_dependencies: tuple[str, ...] | None
4✔
137
    runnable_dependencies: tuple[str, ...] | None
4✔
138

139

140
@dataclass(frozen=True)
4✔
141
class ResolvedExecutionDependencies:
4✔
142
    digest: Digest
4✔
143
    runnable_dependencies: RunnableDependencies | None
4✔
144

145

146
@dataclass(frozen=True)
4✔
147
class RunnableDependencies:
4✔
148
    path_component: str
4✔
149
    immutable_input_digests: Mapping[str, Digest]
4✔
150
    append_only_caches: Mapping[str, str]
4✔
151
    extra_env: Mapping[str, str]
4✔
152

153

154
@dataclass(frozen=True)
4✔
155
class ToolRunnerRequest:
4✔
156
    runnable_address_str: str
4✔
157
    args: tuple[str, ...]
4✔
158
    execution_dependencies: tuple[str, ...]
4✔
159
    runnable_dependencies: tuple[str, ...]
4✔
160
    target: Target
4✔
161
    runnable_address_field_alias: str
4✔
162
    named_caches: FrozenDict[str, str] | None = None
4✔
163

164

165
@dataclass(frozen=True)
4✔
166
class ToolRunner:
4✔
167
    digest: Digest
4✔
168
    args: tuple[str, ...]
4✔
169
    extra_env: FrozenDict[str, str]
4✔
170
    extra_paths: tuple[str, ...]
4✔
171
    append_only_caches: FrozenDict[str, str]
4✔
172
    immutable_input_digests: FrozenDict[str, Digest]
4✔
173

174

175
#
176
# Things that need a home
177
#
178

179

180
@dataclass(frozen=True)
4✔
181
class ExtraSandboxContents:
4✔
182
    digest: Digest
4✔
183
    paths: tuple[str, ...]
4✔
184
    immutable_input_digests: Mapping[str, Digest]
4✔
185
    append_only_caches: Mapping[str, str]
4✔
186
    extra_env: Mapping[str, str]
4✔
187

188

189
@dataclass(frozen=True)
4✔
190
class MergeExtraSandboxContents:
4✔
191
    additions: tuple[ExtraSandboxContents, ...]
4✔
192

193

194
@rule
4✔
195
async def merge_extra_sandbox_contents(request: MergeExtraSandboxContents) -> ExtraSandboxContents:
4✔
196
    additions = request.additions
3✔
197

198
    digests: list[Digest] = []
3✔
199
    paths: list[str] = []
3✔
200
    immutable_input_digests: dict[str, Digest] = {}
3✔
201
    append_only_caches: dict[str, str] = {}
3✔
202
    extra_env: dict[str, str] = {}
3✔
203

204
    for addition in additions:
3✔
205
        digests.append(addition.digest)
3✔
206
        if addition.paths:
3✔
207
            paths.extend(addition.paths)
3✔
208
        _safe_update(immutable_input_digests, addition.immutable_input_digests)
3✔
209
        _safe_update(append_only_caches, addition.append_only_caches)
3✔
210
        _safe_update(extra_env, addition.extra_env)
3✔
211

212
    digest = await merge_digests(MergeDigests(digests))
3✔
213

214
    return ExtraSandboxContents(
3✔
215
        digest=digest,
216
        paths=tuple(paths),
217
        immutable_input_digests=FrozenDict(immutable_input_digests),
218
        append_only_caches=FrozenDict(append_only_caches),
219
        extra_env=FrozenDict(extra_env),
220
    )
221

222

223
#
224
# END THINGS THAT NEED A HOME
225
#
226

227

228
@rule
4✔
229
async def convert_fallible_adhoc_process_result(
4✔
230
    fallible_result: FallibleAdhocProcessResult,
231
) -> AdhocProcessResult:
232
    result = await fallible_to_exec_result_or_raise(
3✔
233
        **implicitly(
234
            {
235
                fallible_result.process_result: FallibleProcessResult,
236
                ProductDescription(fallible_result.description): ProductDescription,
237
            }
238
        )
239
    )
240
    return AdhocProcessResult(
3✔
241
        process_result=result, adjusted_digest=fallible_result.adjusted_digest
242
    )
243

244

245
async def _resolve_runnable_dependencies(
4✔
246
    bash: BashBinary, deps: tuple[str, ...] | None, owning: Address, origin: str
247
) -> tuple[Digest, RunnableDependencies | None]:
248
    if not deps:
3✔
249
        return EMPTY_DIGEST, None
3✔
250

251
    addresses = await resolve_unparsed_address_inputs(
3✔
252
        UnparsedAddressInputs(
253
            (dep for dep in deps),
254
            owning_address=owning,
255
            description_of_origin=origin,
256
        ),
257
        **implicitly(),
258
    )
259

260
    targets = await resolve_targets(**implicitly({addresses: Addresses}))
3✔
261
    fspt = await find_valid_field_sets(
3✔
262
        FieldSetsPerTargetRequest(RunFieldSet, targets), **implicitly()
263
    )
264

265
    for address, field_set in zip(addresses, fspt.collection):
3✔
266
        if not field_set:
3✔
267
            raise ValueError(
×
268
                dedent(
269
                    f"""\
270
                    Address `{address.spec}` was specified as a runnable dependency, but is not
271
                    runnable.
272
                    """
273
                )
274
            )
275

276
    runnables = await concurrently(
3✔
277
        generate_run_in_sandbox_request(**implicitly({field_set[0]: RunFieldSet}))
278
        for field_set in fspt.collection
279
    )
280

281
    shims: list[FileContent] = []
3✔
282
    extras: list[ExtraSandboxContents] = []
3✔
283

284
    for address, runnable in zip(addresses, runnables):
3✔
285
        extras.append(
3✔
286
            ExtraSandboxContents(
287
                digest=runnable.digest,
288
                paths=(),
289
                immutable_input_digests=FrozenDict(runnable.immutable_input_digests or {}),
290
                append_only_caches=FrozenDict(runnable.append_only_caches or {}),
291
                extra_env=FrozenDict(),
292
            )
293
        )
294
        shims.append(
3✔
295
            FileContent(
296
                address.target_name,
297
                _runnable_dependency_shim(bash.path, runnable.args, runnable.extra_env),
298
                is_executable=True,
299
            )
300
        )
301

302
    merged_extras, shim_digest = await concurrently(
3✔
303
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extras))),
304
        create_digest(CreateDigest(shims)),
305
    )
306

307
    shim_digest_path = f"_runnable_dependency_shims_{shim_digest.fingerprint}"
3✔
308
    immutable_input_digests = {shim_digest_path: shim_digest}
3✔
309
    _safe_update(immutable_input_digests, merged_extras.immutable_input_digests)
3✔
310

311
    return (
3✔
312
        merged_extras.digest,
313
        RunnableDependencies(
314
            shim_digest_path,
315
            FrozenDict(immutable_input_digests),
316
            merged_extras.append_only_caches,
317
            FrozenDict({"_PANTS_SHIM_ROOT": "{chroot}"}),
318
        ),
319
    )
320

321

322
@rule
4✔
323
async def resolve_execution_environment(
4✔
324
    request: ResolveExecutionDependenciesRequest,
325
    bash: BashBinary,
326
) -> ResolvedExecutionDependencies:
327
    target_address = request.address
3✔
328
    raw_execution_dependencies = request.execution_dependencies
3✔
329

330
    # Always include the execution dependencies that were specified
331
    if raw_execution_dependencies is not None:
3✔
332
        _descr = f"the `execution_dependencies` from the target {target_address}"
3✔
333
        execution_dependencies = await resolve_unparsed_address_inputs(
3✔
334
            UnparsedAddressInputs(
335
                raw_execution_dependencies,
336
                owning_address=target_address,
337
                description_of_origin=_descr,
338
            ),
339
            **implicitly(),
340
        )
341
    else:
342
        execution_dependencies = Addresses(())
3✔
343

344
    transitive = await transitive_targets(
3✔
345
        TransitiveTargetsRequest(execution_dependencies), **implicitly()
346
    )
347

348
    all_dependencies = (
3✔
349
        *(i for i in transitive.roots if i.address is not target_address),
350
        *transitive.dependencies,
351
    )
352

353
    sources, pkgs_per_target = await concurrently(
3✔
354
        determine_source_files(
355
            SourceFilesRequest(
356
                sources_fields=[tgt.get(SourcesField) for tgt in all_dependencies],
357
                for_sources_types=(SourcesField, FileSourceField),
358
                enable_codegen=True,
359
            )
360
        ),
361
        find_valid_field_sets(
362
            FieldSetsPerTargetRequest(PackageFieldSet, all_dependencies), **implicitly()
363
        ),
364
    )
365

366
    packages = await concurrently(
3✔
367
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
368
        for field_set in pkgs_per_target.field_sets
369
    )
370

371
    _descr = f"the `runnable_dependencies` from the target {target_address}"
3✔
372
    runnables_digest, runnable_dependencies = await _resolve_runnable_dependencies(
3✔
373
        bash, request.runnable_dependencies, target_address, _descr
374
    )
375

376
    dependencies_digest = await merge_digests(
3✔
377
        MergeDigests([sources.snapshot.digest, runnables_digest, *(pkg.digest for pkg in packages)])
378
    )
379

380
    return ResolvedExecutionDependencies(dependencies_digest, runnable_dependencies)
3✔
381

382

383
K = TypeVar("K")
4✔
384
V = TypeVar("V")
4✔
385

386

387
def _safe_update(d1: dict[K, V], d2: Mapping[K, V]) -> dict[K, V]:
4✔
388
    """Updates `d1` with the values from `d2`, raising an exception if a key exists in both
389
    dictionaries, but with a different value."""
390

391
    for k, v in d2.items():
3✔
392
        if k in d1 and d1[k] != v:
3✔
393
            raise ValueError(f"Key {k} was specified in both dictionaries with different values.")
×
394
        d1[k] = v
3✔
395
    return d1
3✔
396

397

398
def _runnable_dependency_shim(
4✔
399
    bash: str, args: Iterable[str], extra_env: Mapping[str, str]
400
) -> bytes:
401
    """The binary shim script to be placed in the output directory for the digest."""
402

403
    def _quote(s: str) -> str:
3✔
404
        quoted = shlex.quote(s)
3✔
405
        return quoted.replace("{chroot}", "'${_PANTS_SHIM_ROOT}'")
3✔
406

407
    binary = " ".join(_quote(arg) for arg in args)
3✔
408
    env_str = "\n".join(
3✔
409
        f"export {shlex.quote(key)}={_quote(value)}" for (key, value) in extra_env.items()
410
    )
411
    return dedent(
3✔
412
        f"""\
413
        #!{bash}
414
        {env_str}
415
        exec {binary} "$@"
416
        """
417
    ).encode()
418

419

420
@rule
4✔
421
async def create_tool_runner(
4✔
422
    request: ToolRunnerRequest,
423
) -> ToolRunner:
424
    runnable_address = await resolve_address(
1✔
425
        **implicitly(
426
            {
427
                AddressInput.parse(
428
                    request.runnable_address_str,
429
                    relative_to=request.target.address.spec_path,
430
                    description_of_origin=f"Runnable target for {request.target.address.spec_path}",
431
                ): AddressInput
432
            }
433
        )
434
    )
435

436
    addresses = Addresses((runnable_address,))
1✔
437
    addresses.expect_single()
1✔
438

439
    runnable_targets = await resolve_targets(**implicitly({addresses: Addresses}))
1✔
440

441
    run_field_sets, environment_name = await concurrently(
1✔
442
        find_valid_field_sets(
443
            FieldSetsPerTargetRequest(RunFieldSet, runnable_targets), **implicitly()
444
        ),
445
        resolve_environment_name(
446
            EnvironmentNameRequest.from_target(request.target), **implicitly()
447
        ),
448
    )
449

450
    if not run_field_sets.field_sets:
1✔
451
        raise InvalidFieldException(
×
452
            f"The `{request.runnable_address_field_alias}` field in target `{request.target.address}` must be set to the "
453
            f"address of a target which can be run by the `run` goal. Instead, the `{request.runnable_address_field_alias}` field is "
454
            f"set to `{request.runnable_address_str}` which refers to the `{runnable_targets[0].alias}` target "
455
            f"at `{runnable_targets[0].address}` that cannot be run by the `run` goal."
456
        )
457

458
    run_field_set: RunFieldSet = run_field_sets.field_sets[0]
1✔
459

460
    req = ResolveExecutionDependenciesRequest(
1✔
461
        address=request.target.address,
462
        execution_dependencies=request.execution_dependencies,
463
        runnable_dependencies=request.runnable_dependencies,
464
    )
465
    execution_environment = await resolve_execution_environment(
1✔
466
        **implicitly({req: ResolveExecutionDependenciesRequest, environment_name: EnvironmentName}),
467
    )
468

469
    # Must be run in target environment so that the binaries/envvars match the execution
470
    # environment when we actually run the process.
471
    run_request = await generate_run_in_sandbox_request(
1✔
472
        **implicitly({run_field_set: RunFieldSet, environment_name: EnvironmentName})
473
    )
474

475
    dependencies_digest = execution_environment.digest
1✔
476
    runnable_dependencies = execution_environment.runnable_dependencies
1✔
477

478
    extra_env: dict[str, str] = dict(run_request.extra_env or {})
1✔
479
    extra_path = extra_env.pop("PATH", None)
1✔
480

481
    extra_sandbox_contents = []
1✔
482

483
    extra_sandbox_contents.append(
1✔
484
        ExtraSandboxContents(
485
            digest=EMPTY_DIGEST,
486
            paths=(extra_path,) if extra_path else (),
487
            immutable_input_digests=run_request.immutable_input_digests or FrozenDict(),
488
            append_only_caches=run_request.append_only_caches or FrozenDict(),
489
            extra_env=run_request.extra_env or FrozenDict(),
490
        )
491
    )
492

493
    if runnable_dependencies:
1✔
494
        extra_sandbox_contents.append(
1✔
495
            ExtraSandboxContents(
496
                digest=EMPTY_DIGEST,
497
                paths=(f"{{chroot}}/{runnable_dependencies.path_component}",),
498
                immutable_input_digests=runnable_dependencies.immutable_input_digests,
499
                append_only_caches=runnable_dependencies.append_only_caches,
500
                extra_env=runnable_dependencies.extra_env,
501
            )
502
        )
503

504
    merged_extras, main_digest = await concurrently(
1✔
505
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extra_sandbox_contents))),
506
        merge_digests(MergeDigests((dependencies_digest, run_request.digest))),
507
    )
508

509
    extra_env = dict(merged_extras.extra_env)
1✔
510

511
    append_only_caches = {
1✔
512
        **merged_extras.append_only_caches,
513
        **(request.named_caches or {}),
514
    }
515

516
    return ToolRunner(
1✔
517
        digest=main_digest,
518
        args=run_request.args + tuple(request.args),
519
        extra_env=FrozenDict(extra_env),
520
        extra_paths=merged_extras.paths,
521
        append_only_caches=FrozenDict(append_only_caches),
522
        immutable_input_digests=FrozenDict(merged_extras.immutable_input_digests),
523
    )
524

525

526
async def check_outputs(
4✔
527
    output_digest: Digest,
528
    output_files: Iterable[str],
529
    output_directories: Iterable[str],
530
    outputs_match_error_behavior: GlobMatchErrorBehavior,
531
    outputs_match_mode: GlobExpansionConjunction | None,
532
    address: Address,
533
) -> None:
534
    """Check an output digest from adhoc/shell backends to ensure that the outputs expected by the
535
    user do in fact exist."""
536

537
    if outputs_match_mode is None:
3✔
538
        return
1✔
539

540
    filtered_output_files_digests = await concurrently(
3✔
541
        digest_subset_to_digest(
542
            DigestSubset(
543
                output_digest,
544
                PathGlobs(
545
                    [output_file],
546
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
547
                ),
548
            ),
549
        )
550
        for output_file in output_files
551
    )
552

553
    filtered_output_directory_digests = await concurrently(
3✔
554
        digest_subset_to_digest(
555
            DigestSubset(
556
                output_digest,
557
                PathGlobs(
558
                    [output_directory, os.path.join(output_directory, "**")],
559
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
560
                ),
561
            ),
562
        )
563
        for output_directory in output_directories
564
    )
565

566
    filtered_output_files = tuple(zip(output_files, filtered_output_files_digests))
3✔
567
    filtered_output_directories = tuple(zip(output_directories, filtered_output_directory_digests))
3✔
568

569
    unused_output_files = tuple(
3✔
570
        f"{output_file} (from `output_files` field)"
571
        for output_file, digest in filtered_output_files
572
        if digest == EMPTY_DIGEST
573
    )
574
    unused_output_directories = tuple(
3✔
575
        f"{output_directory} (from `output_directories` field)"
576
        for output_directory, digest in filtered_output_directories
577
        if digest == EMPTY_DIGEST
578
    )
579

580
    def warn_or_raise(message: str, snapshot: Snapshot) -> None:
3✔
581
        unused_globs_str = ", ".join([*unused_output_files, *unused_output_directories])
1✔
582
        message = f"{message}\n\nThe following output globs were unused: {unused_globs_str}"
1✔
583

584
        if snapshot.dirs:
1✔
585
            message += f"\n\nDirectories in output ({len(snapshot.dirs)} total):"
1✔
586
            dirs = sorted(snapshot.dirs, key=lambda x: x.count(os.pathsep))
1✔
587
            if len(dirs) > 15:
1✔
588
                message += f" {', '.join(dirs[0:15])}, ... (trimmed for brevity)"
×
589
            else:
590
                message += f" {', '.join(dirs)}"
1✔
591

592
        if snapshot.files:
1✔
593
            message += f"\n\nFiles in output ({len(snapshot.files)} total):"
1✔
594
            files = sorted(snapshot.files, key=lambda x: x.count(os.pathsep))
1✔
595
            if len(files) > 15:
1✔
596
                message += f" {', '.join(files[0:15])}, ... (trimmed for brevity)"
×
597
            else:
598
                message += f" {', '.join(files)}"
1✔
599

600
        if outputs_match_error_behavior == GlobMatchErrorBehavior.error:
1✔
601
            raise ValueError(message)
1✔
602
        else:
UNCOV
603
            logger.warning(message)
×
604

605
    if outputs_match_mode == GlobExpansionConjunction.all_match:
3✔
606
        if not unused_output_files and not unused_output_directories:
3✔
607
            return
3✔
608

609
        snapshot, wrapped_tgt = await concurrently(
1✔
610
            digest_to_snapshot(output_digest),
611
            resolve_target(
612
                WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()
613
            ),
614
        )
615
        warn_or_raise(
1✔
616
            f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `all` "
617
            "which requires all output globs to actually match an output.",
618
            snapshot,
619
        )
620
        # Should return otherwise it silently falls through and warns twice
UNCOV
621
        return
×
622

623
    # Otherwise it is `GlobExpansionConjunction.any_match` which means only at least one glob must match.
624
    total_count = len(filtered_output_files) + len(filtered_output_directories)
1✔
625
    unused_count = len(unused_output_files) + len(unused_output_directories)
1✔
626
    if total_count == 0 or unused_count < total_count:
1✔
627
        return
1✔
628
    snapshot, wrapped_tgt = await concurrently(
×
629
        digest_to_snapshot(output_digest),
630
        resolve_target(WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()),
631
    )
632
    warn_or_raise(
×
633
        f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `any` "
634
        "which requires at least one output glob to actually match an output.",
635
        snapshot,
636
    )
637

638

639
@rule
4✔
640
async def run_prepared_adhoc_process(
4✔
641
    prepared_request: PreparedAdhocProcessRequest,
642
) -> FallibleAdhocProcessResult:
643
    request = prepared_request.original_request
3✔
644

645
    result = await execute_process(prepared_request.process, **implicitly())
3✔
646

647
    log_on_errors = request.log_on_process_errors or FrozenDict()
3✔
648
    error_to_log = log_on_errors.get(result.exit_code, None)
3✔
649
    if error_to_log:
3✔
UNCOV
650
        logger.error(error_to_log)
×
651

652
    if request.log_output:
3✔
UNCOV
653
        if result.stdout:
×
UNCOV
654
            logger.info(result.stdout.decode())
×
UNCOV
655
        if result.stderr:
×
UNCOV
656
            logger.warning(result.stderr.decode())
×
657

658
    working_directory = parse_relative_directory(request.working_directory, request.address)
3✔
659

660
    root_output_directory: str | None = None
3✔
661
    if request.use_working_directory_as_base_for_output_captures:
3✔
662
        root_output_directory = parse_relative_directory(
3✔
663
            request.root_output_directory, working_directory
664
        )
665

666
    extras = (
3✔
667
        (request.capture_stdout_file, result.stdout),
668
        (request.capture_stderr_file, result.stderr),
669
    )
670
    extra_contents = {i: j for i, j in extras if i}
3✔
671

672
    # Check the outputs (if configured) to ensure any required glob matches in fact occurred.
673
    output_digest = result.output_digest
3✔
674
    output_files: list[str] = list(request.output_files)
3✔
675
    output_directories: list[str] = list(request.output_directories)
3✔
676
    if request.use_working_directory_as_base_for_output_captures:
3✔
677
        output_files = [
3✔
678
            os.path.normpath(os.path.join(working_directory, of)) for of in output_files
679
        ]
680
        output_directories = [
3✔
681
            os.path.normpath(os.path.join(working_directory, od)) for od in output_directories
682
        ]
683
    await check_outputs(
3✔
684
        output_digest=output_digest,
685
        output_files=output_files,
686
        output_directories=output_directories,
687
        outputs_match_error_behavior=request.outputs_match_error_behavior,
688
        outputs_match_mode=request.outputs_match_conjunction,
689
        address=request.address,
690
    )
691

692
    if extra_contents:
3✔
693
        if request.use_working_directory_as_base_for_output_captures:
1✔
694
            extra_digest = await create_digest(
1✔
695
                CreateDigest(
696
                    FileContent(_parse_relative_file(name, working_directory), content)
697
                    for name, content in extra_contents.items()
698
                )
699
            )
700
        else:
701
            extra_digest = await create_digest(
1✔
702
                CreateDigest(FileContent(name, content) for name, content in extra_contents.items())
703
            )
704

705
        output_digest = await merge_digests(MergeDigests((output_digest, extra_digest)))
1✔
706

707
    adjusted: Digest = output_digest
3✔
708
    if root_output_directory is not None:
3✔
709
        adjusted = await remove_prefix(RemovePrefix(output_digest, root_output_directory))
3✔
710

711
    return FallibleAdhocProcessResult(
3✔
712
        process_result=result,
713
        adjusted_digest=adjusted,
714
        description=request.description,
715
    )
716

717

718
# Compute a stable bytes value for a `PathMetadata` consisting of the values to be hashed.
719
# Access time is not included to avoid having mere access to a file invalidating an execution.
720
def _path_metadata_to_bytes(m: PathMetadata | None) -> bytes:
4✔
721
    if m is None:
1✔
UNCOV
722
        return b""
×
723

724
    def dt_fmt(dt: datetime | None) -> str | None:
1✔
725
        if dt is not None:
1✔
726
            return dt.isoformat()
1✔
727
        return None
×
728

729
    d = {
1✔
730
        "path": m.path,
731
        "kind": str(m.kind),
732
        "length": m.length,
733
        "is_executable": m.is_executable,
734
        "unix_mode": m.unix_mode,
735
        "created": dt_fmt(m.created),
736
        "modified": dt_fmt(m.modified),
737
        "symlink_target": m.symlink_target,
738
    }
739

740
    return json.dumps(d, sort_keys=True).encode()
1✔
741

742

743
async def compute_workspace_invalidation_hash(path_globs: PathGlobs) -> str:
4✔
744
    raw_paths = await path_globs_to_paths(path_globs)
1✔
745
    paths = sorted([*raw_paths.files, *raw_paths.dirs])
1✔
746
    metadata_results = await concurrently(
1✔
747
        path_metadata_request(PathMetadataRequest(path)) for path in paths
748
    )
749

750
    # Compute a stable hash of all of the metadatas since the hash value should be stable
751
    # when used outside the process (for example, in the cache). (The `__hash__` dunder method
752
    # computes an unstable hash which can and does vary across different process invocations.)
753
    #
754
    # While it could be more of an intellectual correctness point than a necessity, It does matter,
755
    # however, for a single user to see the same behavior across process invocations if pantsd restarts.
756
    #
757
    # Note: This could probbaly use a non-cryptographic hash (e.g., Murmur), but that would require
758
    # a third party dependency.
759
    h = hashlib.sha256()
1✔
760
    for mr in metadata_results:
1✔
761
        h.update(_path_metadata_to_bytes(mr.metadata))
1✔
762
    return h.hexdigest()
1✔
763

764

765
@rule
4✔
766
async def prepare_adhoc_process(
4✔
767
    request: AdhocProcessRequest,
768
    bash: BashBinary,
769
) -> PreparedAdhocProcessRequest:
770
    description = request.description
3✔
771
    address = request.address
3✔
772
    working_directory = parse_relative_directory(request.working_directory or "", address)
3✔
773
    argv = request.argv
3✔
774
    timeout: int | None = request.timeout
3✔
775
    output_files = request.output_files
3✔
776
    output_directories = request.output_directories
3✔
777
    append_only_caches = request.append_only_caches or FrozenDict()
3✔
778
    immutable_input_digests = request.immutable_input_digests or FrozenDict()
3✔
779

780
    command_env: dict[str, str] = dict(request.env_vars)
3✔
781

782
    # Compute the hash for any workspace invalidation sources and put the hash into the environment as a dummy variable
783
    # so that the process produced by this rule will be invalidated if any of the referenced files change.
784
    if request.workspace_invalidation_globs is not None:
3✔
785
        workspace_invalidation_hash = await compute_workspace_invalidation_hash(
1✔
786
            request.workspace_invalidation_globs
787
        )
788
        command_env["__PANTS_WORKSPACE_INVALIDATION_SOURCES_HASH"] = workspace_invalidation_hash
1✔
789

790
    input_snapshot = await digest_to_snapshot(request.input_digest)
3✔
791

792
    if not working_directory or working_directory in input_snapshot.dirs:
3✔
793
        # Needed to ensure that underlying filesystem does not change during run
794
        work_dir = EMPTY_DIGEST
3✔
795
    else:
796
        work_dir = await create_digest(CreateDigest([Directory(working_directory)]))
1✔
797

798
    input_digest = await merge_digests(MergeDigests([request.input_digest, work_dir]))
3✔
799

800
    process = Process(
3✔
801
        argv=argv,
802
        description=f"Running {description}",
803
        env=command_env,
804
        input_digest=input_digest,
805
        output_directories=output_directories,
806
        output_files=output_files,
807
        timeout_seconds=timeout,
808
        working_directory=working_directory,
809
        append_only_caches=append_only_caches,
810
        immutable_input_digests=immutable_input_digests,
811
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
812
    )
813

814
    if request.use_working_directory_as_base_for_output_captures:
3✔
815
        process = _output_at_build_root(process, bash)
3✔
816

817
    return PreparedAdhocProcessRequest(
3✔
818
        original_request=request,
819
        process=process,
820
    )
821

822

823
class PathEnvModifyMode(Enum):
4✔
824
    """How the PATH environment variable should be augmented with extra path elements."""
825

826
    PREPEND = "prepend"
4✔
827
    APPEND = "append"
4✔
828
    OFF = "off"
4✔
829

830

831
async def prepare_env_vars(
4✔
832
    existing_env_vars: Mapping[str, str],
833
    env_vars_templates: tuple[str, ...],
834
    *,
835
    extra_paths: tuple[str, ...] = (),
836
    path_env_modify_mode: PathEnvModifyMode = PathEnvModifyMode.PREPEND,
837
    description_of_origin: str,
838
) -> FrozenDict[str, str]:
839
    env_vars: dict[str, str] = dict(existing_env_vars)
3✔
840

841
    to_fetch: set[str] = set()
3✔
842
    duplicate_keys: set[str] = set()
3✔
843
    for env_var in env_vars_templates:
3✔
844
        parts = env_var.split("=", 1)
1✔
845
        if parts[0] in env_vars:
1✔
846
            duplicate_keys.add(parts[0])
×
847

848
        if len(parts) == 2:
1✔
849
            env_vars[parts[0]] = parts[1]
1✔
850
        else:
UNCOV
851
            to_fetch.add(parts[0])
×
852

853
    if duplicate_keys:
3✔
854
        dups_as_str = ", ".join(sorted(duplicate_keys))
×
855
        raise ValueError(
×
856
            f"The following environment variables referenced in {description_of_origin} are defined multiple times: {dups_as_str}"
857
        )
858

859
    if to_fetch:
3✔
UNCOV
860
        fetched_env_vars = await environment_vars_subset(
×
861
            EnvironmentVarsRequest(tuple(sorted(to_fetch))), **implicitly()
862
        )
UNCOV
863
        env_vars.update(fetched_env_vars)
×
864

865
    def path_env_join(left: str | None, right: str | None) -> str | None:
3✔
866
        if not left and not right:
3✔
867
            return None
×
868
        if left and not right:
3✔
869
            return left
3✔
870
        if not left and right:
1✔
871
            return right
×
872
        return f"{left}:{right}"
1✔
873

874
    if extra_paths:
3✔
875
        existing_path_env = env_vars.get("PATH")
3✔
876
        extra_paths_as_str = ":".join(extra_paths)
3✔
877

878
        new_path_env: str | None = None
3✔
879
        if path_env_modify_mode == PathEnvModifyMode.PREPEND:
3✔
880
            new_path_env = path_env_join(extra_paths_as_str, existing_path_env)
3✔
881
        elif path_env_modify_mode == PathEnvModifyMode.APPEND:
1✔
882
            new_path_env = path_env_join(existing_path_env, extra_paths_as_str)
1✔
883

884
        if new_path_env:
3✔
885
            env_vars["PATH"] = new_path_env
3✔
886

887
    return FrozenDict(env_vars)
3✔
888

889

890
def _output_at_build_root(process: Process, bash: BashBinary) -> Process:
4✔
891
    working_directory = process.working_directory or ""
3✔
892

893
    output_directories = process.output_directories
3✔
894
    output_files = process.output_files
3✔
895
    if working_directory:
3✔
896
        output_directories = tuple(os.path.join(working_directory, d) for d in output_directories)
1✔
897
        output_files = tuple(os.path.join(working_directory, d) for d in output_files)
1✔
898

899
    cd = f"cd {shlex.quote(working_directory)} && " if working_directory else ""
3✔
900
    shlexed_argv = shlex.join(process.argv)
3✔
901
    new_argv = (bash.path, "-c", f"{cd}{shlexed_argv}")
3✔
902

903
    return dataclasses.replace(
3✔
904
        process,
905
        argv=new_argv,
906
        working_directory=None,
907
        output_directories=output_directories,
908
        output_files=output_files,
909
    )
910

911

912
def parse_relative_directory(workdir_in: str, relative_to: Address | str) -> str:
4✔
913
    """Convert the `workdir` field into something that can be understood by `Process`."""
914

915
    if isinstance(relative_to, Address):
3✔
916
        reldir = relative_to.spec_path
3✔
917
    else:
918
        reldir = relative_to
3✔
919

920
    if workdir_in == ".":
3✔
921
        return reldir
3✔
922
    elif workdir_in.startswith("./"):
3✔
923
        return os.path.join(reldir, workdir_in[2:])
1✔
924
    elif workdir_in.startswith("/"):
3✔
925
        return workdir_in[1:]
3✔
926
    else:
927
        return workdir_in
1✔
928

929

930
def _parse_relative_file(file_in: str, relative_to: str) -> str:
4✔
931
    """Convert the `capture_std..._file` fields into something that can be understood by
932
    `Process`."""
933

934
    if file_in.startswith("/"):
1✔
935
        return file_in[1:]
1✔
936

937
    return os.path.join(relative_to, file_in)
1✔
938

939

940
def rules():
4✔
941
    return (
3✔
942
        *collect_rules(),
943
        *process.rules(),
944
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc