• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19250292619

11 Nov 2025 12:09AM UTC coverage: 77.865% (-2.4%) from 80.298%
19250292619

push

github

web-flow
flag non-runnable targets used with `code_quality_tool` (#22875)

2 of 5 new or added lines in 2 files covered. (40.0%)

1487 existing lines in 72 files now uncovered.

71448 of 91759 relevant lines covered (77.86%)

3.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.42
/src/python/pants/core/util_rules/adhoc_process_support.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
10✔
4

5
import dataclasses
10✔
6
import hashlib
10✔
7
import json
10✔
8
import logging
10✔
9
import os
10✔
10
import shlex
10✔
11
from collections.abc import Iterable, Mapping
10✔
12
from dataclasses import dataclass
10✔
13
from datetime import datetime
10✔
14
from enum import Enum
10✔
15
from textwrap import dedent  # noqa: PNT20
10✔
16
from typing import TypeVar
10✔
17

18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
10✔
19
from pants.build_graph.address import Address
10✔
20
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
10✔
21
from pants.core.goals.package import (
10✔
22
    EnvironmentAwarePackageRequest,
23
    PackageFieldSet,
24
    environment_aware_package,
25
)
26
from pants.core.goals.run import RunFieldSet, generate_run_in_sandbox_request
10✔
27
from pants.core.target_types import FileSourceField
10✔
28
from pants.core.util_rules.env_vars import environment_vars_subset
10✔
29
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
10✔
30
from pants.core.util_rules.system_binaries import BashBinary
10✔
31
from pants.engine import process
10✔
32
from pants.engine.addresses import Addresses, UnparsedAddressInputs
10✔
33
from pants.engine.env_vars import EnvironmentVarsRequest
10✔
34
from pants.engine.environment import EnvironmentName
10✔
35
from pants.engine.fs import (
10✔
36
    EMPTY_DIGEST,
37
    CreateDigest,
38
    Digest,
39
    DigestSubset,
40
    Directory,
41
    FileContent,
42
    GlobExpansionConjunction,
43
    MergeDigests,
44
    PathGlobs,
45
    PathMetadataRequest,
46
    Snapshot,
47
)
48
from pants.engine.internals.build_files import resolve_address
10✔
49
from pants.engine.internals.graph import (
10✔
50
    find_valid_field_sets,
51
    resolve_target,
52
    resolve_targets,
53
    resolve_unparsed_address_inputs,
54
    transitive_targets,
55
)
56
from pants.engine.internals.native_engine import AddressInput, PathMetadata, RemovePrefix
10✔
57
from pants.engine.intrinsics import (
10✔
58
    create_digest,
59
    digest_subset_to_digest,
60
    digest_to_snapshot,
61
    execute_process,
62
    merge_digests,
63
    path_globs_to_paths,
64
    path_metadata_request,
65
    remove_prefix,
66
)
67
from pants.engine.process import (
10✔
68
    FallibleProcessResult,
69
    Process,
70
    ProcessCacheScope,
71
    ProcessResult,
72
    ProductDescription,
73
    fallible_to_exec_result_or_raise,
74
)
75
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
10✔
76
from pants.engine.target import (
10✔
77
    FieldSetsPerTargetRequest,
78
    InvalidFieldException,
79
    SourcesField,
80
    Target,
81
    TransitiveTargetsRequest,
82
    WrappedTargetRequest,
83
)
84
from pants.util.frozendict import FrozenDict
10✔
85

86
logger = logging.getLogger(__name__)
10✔
87

88

89
@dataclass(frozen=True)
10✔
90
class AdhocProcessRequest:
10✔
91
    description: str
10✔
92
    address: Address
10✔
93
    working_directory: str
10✔
94
    root_output_directory: str
10✔
95
    argv: tuple[str, ...]
10✔
96
    timeout: int | None
10✔
97
    input_digest: Digest
10✔
98
    immutable_input_digests: FrozenDict[str, Digest] | None
10✔
99
    append_only_caches: FrozenDict[str, str] | None
10✔
100
    output_files: tuple[str, ...]
10✔
101
    output_directories: tuple[str, ...]
10✔
102
    env_vars: FrozenDict[str, str]
10✔
103
    log_on_process_errors: FrozenDict[int, str] | None
10✔
104
    log_output: bool
10✔
105
    capture_stdout_file: str | None
10✔
106
    capture_stderr_file: str | None
10✔
107
    workspace_invalidation_globs: PathGlobs | None
10✔
108
    cache_scope: ProcessCacheScope | None = None
10✔
109
    use_working_directory_as_base_for_output_captures: bool = True
10✔
110
    outputs_match_error_behavior: GlobMatchErrorBehavior = GlobMatchErrorBehavior.error
10✔
111
    outputs_match_conjunction: GlobExpansionConjunction | None = GlobExpansionConjunction.all_match
10✔
112

113

114
@dataclass(frozen=True)
10✔
115
class PreparedAdhocProcessRequest:
10✔
116
    original_request: AdhocProcessRequest
10✔
117
    process: Process
10✔
118

119

120
@dataclass(frozen=True)
10✔
121
class FallibleAdhocProcessResult:
10✔
122
    process_result: FallibleProcessResult
10✔
123
    adjusted_digest: Digest
10✔
124
    description: str
10✔
125

126

127
@dataclass(frozen=True)
10✔
128
class AdhocProcessResult:
10✔
129
    process_result: ProcessResult
10✔
130
    adjusted_digest: Digest
10✔
131

132

133
@dataclass(frozen=True)
10✔
134
class ResolveExecutionDependenciesRequest:
10✔
135
    address: Address
10✔
136
    execution_dependencies: tuple[str, ...] | None
10✔
137
    runnable_dependencies: tuple[str, ...] | None
10✔
138

139

140
@dataclass(frozen=True)
10✔
141
class ResolvedExecutionDependencies:
10✔
142
    digest: Digest
10✔
143
    runnable_dependencies: RunnableDependencies | None
10✔
144

145

146
@dataclass(frozen=True)
10✔
147
class RunnableDependencies:
10✔
148
    path_component: str
10✔
149
    immutable_input_digests: Mapping[str, Digest]
10✔
150
    append_only_caches: Mapping[str, str]
10✔
151
    extra_env: Mapping[str, str]
10✔
152

153

154
@dataclass(frozen=True)
10✔
155
class ToolRunnerRequest:
10✔
156
    runnable_address_str: str
10✔
157
    args: tuple[str, ...]
10✔
158
    execution_dependencies: tuple[str, ...]
10✔
159
    runnable_dependencies: tuple[str, ...]
10✔
160
    target: Target
10✔
161
    runnable_address_field_alias: str
10✔
162
    named_caches: FrozenDict[str, str] | None = None
10✔
163

164

165
@dataclass(frozen=True)
10✔
166
class ToolRunner:
10✔
167
    digest: Digest
10✔
168
    args: tuple[str, ...]
10✔
169
    extra_env: FrozenDict[str, str]
10✔
170
    extra_paths: tuple[str, ...]
10✔
171
    append_only_caches: FrozenDict[str, str]
10✔
172
    immutable_input_digests: FrozenDict[str, Digest]
10✔
173

174

175
#
176
# Things that need a home
177
#
178

179

180
@dataclass(frozen=True)
10✔
181
class ExtraSandboxContents:
10✔
182
    digest: Digest
10✔
183
    paths: tuple[str, ...]
10✔
184
    immutable_input_digests: Mapping[str, Digest]
10✔
185
    append_only_caches: Mapping[str, str]
10✔
186
    extra_env: Mapping[str, str]
10✔
187

188

189
@dataclass(frozen=True)
10✔
190
class MergeExtraSandboxContents:
10✔
191
    additions: tuple[ExtraSandboxContents, ...]
10✔
192

193

194
@rule
10✔
195
async def merge_extra_sandbox_contents(request: MergeExtraSandboxContents) -> ExtraSandboxContents:
10✔
196
    additions = request.additions
×
197

198
    digests: list[Digest] = []
×
199
    paths: list[str] = []
×
200
    immutable_input_digests: dict[str, Digest] = {}
×
201
    append_only_caches: dict[str, str] = {}
×
202
    extra_env: dict[str, str] = {}
×
203

204
    for addition in additions:
×
205
        digests.append(addition.digest)
×
206
        if addition.paths:
×
207
            paths.extend(addition.paths)
×
208
        _safe_update(immutable_input_digests, addition.immutable_input_digests)
×
209
        _safe_update(append_only_caches, addition.append_only_caches)
×
210
        _safe_update(extra_env, addition.extra_env)
×
211

212
    digest = await merge_digests(MergeDigests(digests))
×
213

214
    return ExtraSandboxContents(
×
215
        digest=digest,
216
        paths=tuple(paths),
217
        immutable_input_digests=FrozenDict(immutable_input_digests),
218
        append_only_caches=FrozenDict(append_only_caches),
219
        extra_env=FrozenDict(extra_env),
220
    )
221

222

223
#
224
# END THINGS THAT NEED A HOME
225
#
226

227

228
@rule
10✔
229
async def convert_fallible_adhoc_process_result(
10✔
230
    fallible_result: FallibleAdhocProcessResult,
231
) -> AdhocProcessResult:
232
    result = await fallible_to_exec_result_or_raise(
×
233
        **implicitly(
234
            {
235
                fallible_result.process_result: FallibleProcessResult,
236
                ProductDescription(fallible_result.description): ProductDescription,
237
            }
238
        )
239
    )
240
    return AdhocProcessResult(
×
241
        process_result=result, adjusted_digest=fallible_result.adjusted_digest
242
    )
243

244

245
async def _resolve_runnable_dependencies(
10✔
246
    bash: BashBinary, deps: tuple[str, ...] | None, owning: Address, origin: str
247
) -> tuple[Digest, RunnableDependencies | None]:
248
    if not deps:
×
249
        return EMPTY_DIGEST, None
×
250

251
    addresses = await resolve_unparsed_address_inputs(
×
252
        UnparsedAddressInputs(
253
            (dep for dep in deps),
254
            owning_address=owning,
255
            description_of_origin=origin,
256
        ),
257
        **implicitly(),
258
    )
259

260
    targets = await resolve_targets(**implicitly({addresses: Addresses}))
×
261
    fspt = await find_valid_field_sets(
×
262
        FieldSetsPerTargetRequest(RunFieldSet, targets), **implicitly()
263
    )
264

265
    for address, field_set in zip(addresses, fspt.collection):
×
266
        if not field_set:
×
267
            raise ValueError(
×
268
                dedent(
269
                    f"""\
270
                    Address `{address.spec}` was specified as a runnable dependency, but is not
271
                    runnable.
272
                    """
273
                )
274
            )
275

276
    runnables = await concurrently(
×
277
        generate_run_in_sandbox_request(**implicitly({field_set[0]: RunFieldSet}))
278
        for field_set in fspt.collection
279
    )
280

281
    shims: list[FileContent] = []
×
282
    extras: list[ExtraSandboxContents] = []
×
283

284
    for address, runnable in zip(addresses, runnables):
×
285
        extras.append(
×
286
            ExtraSandboxContents(
287
                digest=runnable.digest,
288
                paths=(),
289
                immutable_input_digests=FrozenDict(runnable.immutable_input_digests or {}),
290
                append_only_caches=FrozenDict(runnable.append_only_caches or {}),
291
                extra_env=FrozenDict(),
292
            )
293
        )
294
        shims.append(
×
295
            FileContent(
296
                address.target_name,
297
                _runnable_dependency_shim(bash.path, runnable.args, runnable.extra_env),
298
                is_executable=True,
299
            )
300
        )
301

302
    merged_extras, shim_digest = await concurrently(
×
303
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extras))),
304
        create_digest(CreateDigest(shims)),
305
    )
306

307
    shim_digest_path = f"_runnable_dependency_shims_{shim_digest.fingerprint}"
×
308
    immutable_input_digests = {shim_digest_path: shim_digest}
×
309
    _safe_update(immutable_input_digests, merged_extras.immutable_input_digests)
×
310

311
    return (
×
312
        merged_extras.digest,
313
        RunnableDependencies(
314
            shim_digest_path,
315
            FrozenDict(immutable_input_digests),
316
            merged_extras.append_only_caches,
317
            FrozenDict({"_PANTS_SHIM_ROOT": "{chroot}"}),
318
        ),
319
    )
320

321

322
@rule
10✔
323
async def resolve_execution_environment(
10✔
324
    request: ResolveExecutionDependenciesRequest,
325
    bash: BashBinary,
326
) -> ResolvedExecutionDependencies:
327
    target_address = request.address
×
328
    raw_execution_dependencies = request.execution_dependencies
×
329

330
    # Always include the execution dependencies that were specified
331
    if raw_execution_dependencies is not None:
×
332
        _descr = f"the `execution_dependencies` from the target {target_address}"
×
333
        execution_dependencies = await resolve_unparsed_address_inputs(
×
334
            UnparsedAddressInputs(
335
                raw_execution_dependencies,
336
                owning_address=target_address,
337
                description_of_origin=_descr,
338
            ),
339
            **implicitly(),
340
        )
341
    else:
342
        execution_dependencies = Addresses(())
×
343

344
    transitive = await transitive_targets(
×
345
        TransitiveTargetsRequest(execution_dependencies), **implicitly()
346
    )
347

348
    all_dependencies = (
×
349
        *(i for i in transitive.roots if i.address is not target_address),
350
        *transitive.dependencies,
351
    )
352

353
    sources, pkgs_per_target = await concurrently(
×
354
        determine_source_files(
355
            SourceFilesRequest(
356
                sources_fields=[tgt.get(SourcesField) for tgt in all_dependencies],
357
                for_sources_types=(SourcesField, FileSourceField),
358
                enable_codegen=True,
359
            )
360
        ),
361
        find_valid_field_sets(
362
            FieldSetsPerTargetRequest(PackageFieldSet, all_dependencies), **implicitly()
363
        ),
364
    )
365

366
    packages = await concurrently(
×
367
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
368
        for field_set in pkgs_per_target.field_sets
369
    )
370

371
    _descr = f"the `runnable_dependencies` from the target {target_address}"
×
372
    runnables_digest, runnable_dependencies = await _resolve_runnable_dependencies(
×
373
        bash, request.runnable_dependencies, target_address, _descr
374
    )
375

376
    dependencies_digest = await merge_digests(
×
377
        MergeDigests([sources.snapshot.digest, runnables_digest, *(pkg.digest for pkg in packages)])
378
    )
379

380
    return ResolvedExecutionDependencies(dependencies_digest, runnable_dependencies)
×
381

382

383
K = TypeVar("K")
10✔
384
V = TypeVar("V")
10✔
385

386

387
def _safe_update(d1: dict[K, V], d2: Mapping[K, V]) -> dict[K, V]:
10✔
388
    """Updates `d1` with the values from `d2`, raising an exception if a key exists in both
389
    dictionaries, but with a different value."""
390

391
    for k, v in d2.items():
×
392
        if k in d1 and d1[k] != v:
×
393
            raise ValueError(f"Key {k} was specified in both dictionaries with different values.")
×
394
        d1[k] = v
×
395
    return d1
×
396

397

398
def _runnable_dependency_shim(
10✔
399
    bash: str, args: Iterable[str], extra_env: Mapping[str, str]
400
) -> bytes:
401
    """The binary shim script to be placed in the output directory for the digest."""
402

403
    def _quote(s: str) -> str:
×
404
        quoted = shlex.quote(s)
×
405
        return quoted.replace("{chroot}", "'${_PANTS_SHIM_ROOT}'")
×
406

407
    binary = " ".join(_quote(arg) for arg in args)
×
408
    env_str = "\n".join(
×
409
        f"export {shlex.quote(key)}={_quote(value)}" for (key, value) in extra_env.items()
410
    )
411
    return dedent(
×
412
        f"""\
413
        #!{bash}
414
        {env_str}
415
        exec {binary} "$@"
416
        """
417
    ).encode()
418

419

420
@rule
10✔
421
async def create_tool_runner(
10✔
422
    request: ToolRunnerRequest,
423
) -> ToolRunner:
424
    runnable_address = await resolve_address(
×
425
        **implicitly(
426
            {
427
                AddressInput.parse(
428
                    request.runnable_address_str,
429
                    relative_to=request.target.address.spec_path,
430
                    description_of_origin=f"Runnable target for {request.target.address.spec_path}",
431
                ): AddressInput
432
            }
433
        )
434
    )
435

436
    addresses = Addresses((runnable_address,))
×
437
    addresses.expect_single()
×
438

439
    runnable_targets = await resolve_targets(**implicitly({addresses: Addresses}))
×
440

441
    run_field_sets, environment_name = await concurrently(
×
442
        find_valid_field_sets(
443
            FieldSetsPerTargetRequest(RunFieldSet, runnable_targets), **implicitly()
444
        ),
445
        resolve_environment_name(
446
            EnvironmentNameRequest.from_target(request.target), **implicitly()
447
        ),
448
    )
449

NEW
450
    if not run_field_sets.field_sets:
×
NEW
451
        raise InvalidFieldException(
×
452
            f"The `{request.runnable_address_field_alias}` field in target `{request.target.address}` must be set to the "
453
            f"address of a target which can be run by the `run` goal. Instead, the `{request.runnable_address_field_alias}` field is "
454
            f"set to `{request.runnable_address_str}` which refers to the `{runnable_targets[0].alias}` target "
455
            f"at `{runnable_targets[0].address}` that cannot be run by the `run` goal."
456
        )
457

NEW
458
    run_field_set: RunFieldSet = run_field_sets.field_sets[0]
×
459

UNCOV
460
    req = ResolveExecutionDependenciesRequest(
×
461
        address=request.target.address,
462
        execution_dependencies=request.execution_dependencies,
463
        runnable_dependencies=request.runnable_dependencies,
464
    )
465
    execution_environment = await resolve_execution_environment(
×
466
        **implicitly({req: ResolveExecutionDependenciesRequest, environment_name: EnvironmentName}),
467
    )
468

469
    # Must be run in target environment so that the binaries/envvars match the execution
470
    # environment when we actually run the process.
471
    run_request = await generate_run_in_sandbox_request(
×
472
        **implicitly({run_field_set: RunFieldSet, environment_name: EnvironmentName})
473
    )
474

475
    dependencies_digest = execution_environment.digest
×
476
    runnable_dependencies = execution_environment.runnable_dependencies
×
477

478
    extra_env: dict[str, str] = dict(run_request.extra_env or {})
×
479
    extra_path = extra_env.pop("PATH", None)
×
480

481
    extra_sandbox_contents = []
×
482

483
    extra_sandbox_contents.append(
×
484
        ExtraSandboxContents(
485
            digest=EMPTY_DIGEST,
486
            paths=(extra_path,) if extra_path else (),
487
            immutable_input_digests=run_request.immutable_input_digests or FrozenDict(),
488
            append_only_caches=run_request.append_only_caches or FrozenDict(),
489
            extra_env=run_request.extra_env or FrozenDict(),
490
        )
491
    )
492

493
    if runnable_dependencies:
×
494
        extra_sandbox_contents.append(
×
495
            ExtraSandboxContents(
496
                digest=EMPTY_DIGEST,
497
                paths=(f"{{chroot}}/{runnable_dependencies.path_component}",),
498
                immutable_input_digests=runnable_dependencies.immutable_input_digests,
499
                append_only_caches=runnable_dependencies.append_only_caches,
500
                extra_env=runnable_dependencies.extra_env,
501
            )
502
        )
503

504
    merged_extras, main_digest = await concurrently(
×
505
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extra_sandbox_contents))),
506
        merge_digests(MergeDigests((dependencies_digest, run_request.digest))),
507
    )
508

509
    extra_env = dict(merged_extras.extra_env)
×
510

511
    append_only_caches = {
×
512
        **merged_extras.append_only_caches,
513
        **(request.named_caches or {}),
514
    }
515

516
    return ToolRunner(
×
517
        digest=main_digest,
518
        args=run_request.args + tuple(request.args),
519
        extra_env=FrozenDict(extra_env),
520
        extra_paths=merged_extras.paths,
521
        append_only_caches=FrozenDict(append_only_caches),
522
        immutable_input_digests=FrozenDict(merged_extras.immutable_input_digests),
523
    )
524

525

526
async def check_outputs(
10✔
527
    output_digest: Digest,
528
    output_files: Iterable[str],
529
    output_directories: Iterable[str],
530
    outputs_match_error_behavior: GlobMatchErrorBehavior,
531
    outputs_match_mode: GlobExpansionConjunction | None,
532
    address: Address,
533
) -> None:
534
    """Check an output digest from adhoc/shell backends to ensure that the outputs expected by the
535
    user do in fact exist."""
536

537
    if outputs_match_mode is None:
×
538
        return
×
539

540
    filtered_output_files_digests = await concurrently(
×
541
        digest_subset_to_digest(
542
            DigestSubset(
543
                output_digest,
544
                PathGlobs(
545
                    [output_file],
546
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
547
                ),
548
            ),
549
        )
550
        for output_file in output_files
551
    )
552

553
    filtered_output_directory_digests = await concurrently(
×
554
        digest_subset_to_digest(
555
            DigestSubset(
556
                output_digest,
557
                PathGlobs(
558
                    [output_directory, os.path.join(output_directory, "**")],
559
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
560
                ),
561
            ),
562
        )
563
        for output_directory in output_directories
564
    )
565

566
    filtered_output_files = tuple(zip(output_files, filtered_output_files_digests))
×
567
    filtered_output_directories = tuple(zip(output_directories, filtered_output_directory_digests))
×
568

569
    unused_output_files = tuple(
×
570
        f"{output_file} (from `output_files` field)"
571
        for output_file, digest in filtered_output_files
572
        if digest == EMPTY_DIGEST
573
    )
574
    unused_output_directories = tuple(
×
575
        f"{output_directory} (from `output_directories` field)"
576
        for output_directory, digest in filtered_output_directories
577
        if digest == EMPTY_DIGEST
578
    )
579

580
    def warn_or_raise(message: str, snapshot: Snapshot) -> None:
×
581
        unused_globs_str = ", ".join([*unused_output_files, *unused_output_directories])
×
582
        message = f"{message}\n\nThe following output globs were unused: {unused_globs_str}"
×
583

584
        if snapshot.dirs:
×
585
            message += f"\n\nDirectories in output ({len(snapshot.dirs)} total):"
×
586
            dirs = sorted(snapshot.dirs, key=lambda x: x.count(os.pathsep))
×
587
            if len(dirs) > 15:
×
588
                message += f" {', '.join(dirs[0:15])}, ... (trimmed for brevity)"
×
589
            else:
590
                message += f" {', '.join(dirs)}"
×
591

592
        if snapshot.files:
×
593
            message += f"\n\nFiles in output ({len(snapshot.files)} total):"
×
594
            files = sorted(snapshot.files, key=lambda x: x.count(os.pathsep))
×
595
            if len(files) > 15:
×
596
                message += f" {', '.join(files[0:15])}, ... (trimmed for brevity)"
×
597
            else:
598
                message += f" {', '.join(files)}"
×
599

600
        if outputs_match_error_behavior == GlobMatchErrorBehavior.error:
×
601
            raise ValueError(message)
×
602
        else:
603
            logger.warning(message)
×
604

605
    if outputs_match_mode == GlobExpansionConjunction.all_match:
×
606
        if not unused_output_files and not unused_output_directories:
×
607
            return
×
608

609
        snapshot, wrapped_tgt = await concurrently(
×
610
            digest_to_snapshot(output_digest),
611
            resolve_target(
612
                WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()
613
            ),
614
        )
615
        warn_or_raise(
×
616
            f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `all` "
617
            "which requires all output globs to actually match an output.",
618
            snapshot,
619
        )
620

621
    # Otherwise it is `GlobExpansionConjunction.any_match` which means only at least one glob must match.
622
    total_count = len(filtered_output_files) + len(filtered_output_directories)
×
623
    unused_count = len(unused_output_files) + len(unused_output_directories)
×
624
    if total_count == 0 or unused_count < total_count:
×
625
        return
×
626
    snapshot, wrapped_tgt = await concurrently(
×
627
        digest_to_snapshot(output_digest),
628
        resolve_target(WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()),
629
    )
630
    warn_or_raise(
×
631
        f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `any` "
632
        "which requires at least one output glob to actually match an output.",
633
        snapshot,
634
    )
635

636

637
@rule
10✔
638
async def run_prepared_adhoc_process(
10✔
639
    prepared_request: PreparedAdhocProcessRequest,
640
) -> FallibleAdhocProcessResult:
641
    request = prepared_request.original_request
×
642

643
    result = await execute_process(prepared_request.process, **implicitly())
×
644

645
    log_on_errors = request.log_on_process_errors or FrozenDict()
×
646
    error_to_log = log_on_errors.get(result.exit_code, None)
×
647
    if error_to_log:
×
648
        logger.error(error_to_log)
×
649

650
    if request.log_output:
×
651
        if result.stdout:
×
652
            logger.info(result.stdout.decode())
×
653
        if result.stderr:
×
654
            logger.warning(result.stderr.decode())
×
655

656
    working_directory = parse_relative_directory(request.working_directory, request.address)
×
657

658
    root_output_directory: str | None = None
×
659
    if request.use_working_directory_as_base_for_output_captures:
×
660
        root_output_directory = parse_relative_directory(
×
661
            request.root_output_directory, working_directory
662
        )
663

664
    extras = (
×
665
        (request.capture_stdout_file, result.stdout),
666
        (request.capture_stderr_file, result.stderr),
667
    )
668
    extra_contents = {i: j for i, j in extras if i}
×
669

670
    # Check the outputs (if configured) to ensure any required glob matches in fact occurred.
671
    output_digest = result.output_digest
×
672
    output_files: list[str] = list(request.output_files)
×
673
    output_directories: list[str] = list(request.output_directories)
×
674
    if request.use_working_directory_as_base_for_output_captures:
×
675
        output_files = [
×
676
            os.path.normpath(os.path.join(working_directory, of)) for of in output_files
677
        ]
678
        output_directories = [
×
679
            os.path.normpath(os.path.join(working_directory, od)) for od in output_directories
680
        ]
681
    await check_outputs(
×
682
        output_digest=output_digest,
683
        output_files=output_files,
684
        output_directories=output_directories,
685
        outputs_match_error_behavior=request.outputs_match_error_behavior,
686
        outputs_match_mode=request.outputs_match_conjunction,
687
        address=request.address,
688
    )
689

690
    if extra_contents:
×
691
        if request.use_working_directory_as_base_for_output_captures:
×
692
            extra_digest = await create_digest(
×
693
                CreateDigest(
694
                    FileContent(_parse_relative_file(name, working_directory), content)
695
                    for name, content in extra_contents.items()
696
                )
697
            )
698
        else:
699
            extra_digest = await create_digest(
×
700
                CreateDigest(FileContent(name, content) for name, content in extra_contents.items())
701
            )
702

703
        output_digest = await merge_digests(MergeDigests((output_digest, extra_digest)))
×
704

705
    adjusted: Digest = output_digest
×
706
    if root_output_directory is not None:
×
707
        adjusted = await remove_prefix(RemovePrefix(output_digest, root_output_directory))
×
708

709
    return FallibleAdhocProcessResult(
×
710
        process_result=result,
711
        adjusted_digest=adjusted,
712
        description=request.description,
713
    )
714

715

716
# Compute a stable bytes value for a `PathMetadata` consisting of the values to be hashed.
717
# Access time is not included to avoid having mere access to a file invalidating an execution.
718
def _path_metadata_to_bytes(m: PathMetadata | None) -> bytes:
10✔
719
    if m is None:
1✔
720
        return b""
1✔
721

722
    def dt_fmt(dt: datetime | None) -> str | None:
1✔
723
        if dt is not None:
1✔
724
            return dt.isoformat()
1✔
725
        return None
×
726

727
    d = {
1✔
728
        "path": m.path,
729
        "kind": str(m.kind),
730
        "length": m.length,
731
        "is_executable": m.is_executable,
732
        "unix_mode": m.unix_mode,
733
        "created": dt_fmt(m.created),
734
        "modified": dt_fmt(m.modified),
735
        "symlink_target": m.symlink_target,
736
    }
737

738
    return json.dumps(d, sort_keys=True).encode()
1✔
739

740

741
async def compute_workspace_invalidation_hash(path_globs: PathGlobs) -> str:
10✔
742
    raw_paths = await path_globs_to_paths(path_globs)
×
743
    paths = sorted([*raw_paths.files, *raw_paths.dirs])
×
744
    metadata_results = await concurrently(
×
745
        path_metadata_request(PathMetadataRequest(path)) for path in paths
746
    )
747

748
    # Compute a stable hash of all of the metadatas since the hash value should be stable
749
    # when used outside the process (for example, in the cache). (The `__hash__` dunder method
750
    # computes an unstable hash which can and does vary across different process invocations.)
751
    #
752
    # While it could be more of an intellectual correctness point than a necessity, It does matter,
753
    # however, for a single user to see the same behavior across process invocations if pantsd restarts.
754
    #
755
    # Note: This could probbaly use a non-cryptographic hash (e.g., Murmur), but that would require
756
    # a third party dependency.
757
    h = hashlib.sha256()
×
758
    for mr in metadata_results:
×
759
        h.update(_path_metadata_to_bytes(mr.metadata))
×
760
    return h.hexdigest()
×
761

762

763
@rule
10✔
764
async def prepare_adhoc_process(
10✔
765
    request: AdhocProcessRequest,
766
    bash: BashBinary,
767
) -> PreparedAdhocProcessRequest:
768
    description = request.description
×
769
    address = request.address
×
770
    working_directory = parse_relative_directory(request.working_directory or "", address)
×
771
    argv = request.argv
×
772
    timeout: int | None = request.timeout
×
773
    output_files = request.output_files
×
774
    output_directories = request.output_directories
×
775
    append_only_caches = request.append_only_caches or FrozenDict()
×
776
    immutable_input_digests = request.immutable_input_digests or FrozenDict()
×
777

778
    command_env: dict[str, str] = dict(request.env_vars)
×
779

780
    # Compute the hash for any workspace invalidation sources and put the hash into the environment as a dummy variable
781
    # so that the process produced by this rule will be invalidated if any of the referenced files change.
782
    if request.workspace_invalidation_globs is not None:
×
783
        workspace_invalidation_hash = await compute_workspace_invalidation_hash(
×
784
            request.workspace_invalidation_globs
785
        )
786
        command_env["__PANTS_WORKSPACE_INVALIDATION_SOURCES_HASH"] = workspace_invalidation_hash
×
787

788
    input_snapshot = await digest_to_snapshot(request.input_digest)
×
789

790
    if not working_directory or working_directory in input_snapshot.dirs:
×
791
        # Needed to ensure that underlying filesystem does not change during run
792
        work_dir = EMPTY_DIGEST
×
793
    else:
794
        work_dir = await create_digest(CreateDigest([Directory(working_directory)]))
×
795

796
    input_digest = await merge_digests(MergeDigests([request.input_digest, work_dir]))
×
797

798
    process = Process(
×
799
        argv=argv,
800
        description=f"Running {description}",
801
        env=command_env,
802
        input_digest=input_digest,
803
        output_directories=output_directories,
804
        output_files=output_files,
805
        timeout_seconds=timeout,
806
        working_directory=working_directory,
807
        append_only_caches=append_only_caches,
808
        immutable_input_digests=immutable_input_digests,
809
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
810
    )
811

812
    if request.use_working_directory_as_base_for_output_captures:
×
813
        process = _output_at_build_root(process, bash)
×
814

815
    return PreparedAdhocProcessRequest(
×
816
        original_request=request,
817
        process=process,
818
    )
819

820

821
class PathEnvModifyMode(Enum):
10✔
822
    """How the PATH environment variable should be augmented with extra path elements."""
823

824
    PREPEND = "prepend"
10✔
825
    APPEND = "append"
10✔
826
    OFF = "off"
10✔
827

828

829
async def prepare_env_vars(
10✔
830
    existing_env_vars: Mapping[str, str],
831
    env_vars_templates: tuple[str, ...],
832
    *,
833
    extra_paths: tuple[str, ...] = (),
834
    path_env_modify_mode: PathEnvModifyMode = PathEnvModifyMode.PREPEND,
835
    description_of_origin: str,
836
) -> FrozenDict[str, str]:
837
    env_vars: dict[str, str] = dict(existing_env_vars)
×
838

839
    to_fetch: set[str] = set()
×
840
    duplicate_keys: set[str] = set()
×
841
    for env_var in env_vars_templates:
×
842
        parts = env_var.split("=", 1)
×
843
        if parts[0] in env_vars:
×
844
            duplicate_keys.add(parts[0])
×
845

846
        if len(parts) == 2:
×
847
            env_vars[parts[0]] = parts[1]
×
848
        else:
849
            to_fetch.add(parts[0])
×
850

851
    if duplicate_keys:
×
852
        dups_as_str = ", ".join(sorted(duplicate_keys))
×
853
        raise ValueError(
×
854
            f"The following environment variables referenced in {description_of_origin} are defined multiple times: {dups_as_str}"
855
        )
856

857
    if to_fetch:
×
858
        fetched_env_vars = await environment_vars_subset(
×
859
            EnvironmentVarsRequest(tuple(sorted(to_fetch))), **implicitly()
860
        )
861
        env_vars.update(fetched_env_vars)
×
862

863
    def path_env_join(left: str | None, right: str | None) -> str | None:
×
864
        if not left and not right:
×
865
            return None
×
866
        if left and not right:
×
867
            return left
×
868
        if not left and right:
×
869
            return right
×
870
        return f"{left}:{right}"
×
871

872
    if extra_paths:
×
873
        existing_path_env = env_vars.get("PATH")
×
874
        extra_paths_as_str = ":".join(extra_paths)
×
875

876
        new_path_env: str | None = None
×
877
        if path_env_modify_mode == PathEnvModifyMode.PREPEND:
×
878
            new_path_env = path_env_join(extra_paths_as_str, existing_path_env)
×
879
        elif path_env_modify_mode == PathEnvModifyMode.APPEND:
×
880
            new_path_env = path_env_join(existing_path_env, extra_paths_as_str)
×
881

882
        if new_path_env:
×
883
            env_vars["PATH"] = new_path_env
×
884

885
    return FrozenDict(env_vars)
×
886

887

888
def _output_at_build_root(process: Process, bash: BashBinary) -> Process:
10✔
889
    working_directory = process.working_directory or ""
×
890

891
    output_directories = process.output_directories
×
892
    output_files = process.output_files
×
893
    if working_directory:
×
894
        output_directories = tuple(os.path.join(working_directory, d) for d in output_directories)
×
895
        output_files = tuple(os.path.join(working_directory, d) for d in output_files)
×
896

897
    cd = f"cd {shlex.quote(working_directory)} && " if working_directory else ""
×
898
    shlexed_argv = shlex.join(process.argv)
×
899
    new_argv = (bash.path, "-c", f"{cd}{shlexed_argv}")
×
900

901
    return dataclasses.replace(
×
902
        process,
903
        argv=new_argv,
904
        working_directory=None,
905
        output_directories=output_directories,
906
        output_files=output_files,
907
    )
908

909

910
def parse_relative_directory(workdir_in: str, relative_to: Address | str) -> str:
10✔
911
    """Convert the `workdir` field into something that can be understood by `Process`."""
912

913
    if isinstance(relative_to, Address):
×
914
        reldir = relative_to.spec_path
×
915
    else:
916
        reldir = relative_to
×
917

918
    if workdir_in == ".":
×
919
        return reldir
×
920
    elif workdir_in.startswith("./"):
×
921
        return os.path.join(reldir, workdir_in[2:])
×
922
    elif workdir_in.startswith("/"):
×
923
        return workdir_in[1:]
×
924
    else:
925
        return workdir_in
×
926

927

928
def _parse_relative_file(file_in: str, relative_to: str) -> str:
10✔
929
    """Convert the `capture_std..._file` fields into something that can be understood by
930
    `Process`."""
931

932
    if file_in.startswith("/"):
×
933
        return file_in[1:]
×
934

935
    return os.path.join(relative_to, file_in)
×
936

937

938
def rules():
10✔
939
    return (
6✔
940
        *collect_rules(),
941
        *process.rules(),
942
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc