• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18252174847

05 Oct 2025 01:36AM UTC coverage: 43.382% (-36.9%) from 80.261%
18252174847

push

github

web-flow
run tests on mac arm (#22717)

Just doing the minimal to pull forward the x86_64 pattern.

ref #20993

25776 of 59416 relevant lines covered (43.38%)

1.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.67
/src/python/pants/core/util_rules/adhoc_process_support.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
3✔
4

5
import dataclasses
3✔
6
import hashlib
3✔
7
import json
3✔
8
import logging
3✔
9
import os
3✔
10
import shlex
3✔
11
from collections.abc import Iterable, Mapping
3✔
12
from dataclasses import dataclass
3✔
13
from datetime import datetime
3✔
14
from enum import Enum
3✔
15
from textwrap import dedent  # noqa: PNT20
3✔
16
from typing import TypeVar
3✔
17

18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
3✔
19
from pants.build_graph.address import Address
3✔
20
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
3✔
21
from pants.core.goals.package import (
3✔
22
    EnvironmentAwarePackageRequest,
23
    PackageFieldSet,
24
    environment_aware_package,
25
)
26
from pants.core.goals.run import RunFieldSet, generate_run_in_sandbox_request
3✔
27
from pants.core.target_types import FileSourceField
3✔
28
from pants.core.util_rules.env_vars import environment_vars_subset
3✔
29
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
3✔
30
from pants.core.util_rules.system_binaries import BashBinary
3✔
31
from pants.engine import process
3✔
32
from pants.engine.addresses import Addresses, UnparsedAddressInputs
3✔
33
from pants.engine.env_vars import EnvironmentVarsRequest
3✔
34
from pants.engine.environment import EnvironmentName
3✔
35
from pants.engine.fs import (
3✔
36
    EMPTY_DIGEST,
37
    CreateDigest,
38
    Digest,
39
    DigestSubset,
40
    Directory,
41
    FileContent,
42
    GlobExpansionConjunction,
43
    MergeDigests,
44
    PathGlobs,
45
    PathMetadataRequest,
46
    Snapshot,
47
)
48
from pants.engine.internals.build_files import resolve_address
3✔
49
from pants.engine.internals.graph import (
3✔
50
    find_valid_field_sets,
51
    resolve_target,
52
    resolve_targets,
53
    resolve_unparsed_address_inputs,
54
    transitive_targets,
55
)
56
from pants.engine.internals.native_engine import AddressInput, PathMetadata, RemovePrefix
3✔
57
from pants.engine.intrinsics import (
3✔
58
    create_digest,
59
    digest_subset_to_digest,
60
    digest_to_snapshot,
61
    execute_process,
62
    merge_digests,
63
    path_globs_to_paths,
64
    path_metadata_request,
65
    remove_prefix,
66
)
67
from pants.engine.process import (
3✔
68
    FallibleProcessResult,
69
    Process,
70
    ProcessCacheScope,
71
    ProcessResult,
72
    ProductDescription,
73
    fallible_to_exec_result_or_raise,
74
)
75
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
3✔
76
from pants.engine.target import (
3✔
77
    FieldSetsPerTargetRequest,
78
    SourcesField,
79
    Target,
80
    TransitiveTargetsRequest,
81
    WrappedTargetRequest,
82
)
83
from pants.util.frozendict import FrozenDict
3✔
84

85
logger = logging.getLogger(__name__)
3✔
86

87

88
@dataclass(frozen=True)
3✔
89
class AdhocProcessRequest:
3✔
90
    description: str
3✔
91
    address: Address
3✔
92
    working_directory: str
3✔
93
    root_output_directory: str
3✔
94
    argv: tuple[str, ...]
3✔
95
    timeout: int | None
3✔
96
    input_digest: Digest
3✔
97
    immutable_input_digests: FrozenDict[str, Digest] | None
3✔
98
    append_only_caches: FrozenDict[str, str] | None
3✔
99
    output_files: tuple[str, ...]
3✔
100
    output_directories: tuple[str, ...]
3✔
101
    env_vars: FrozenDict[str, str]
3✔
102
    log_on_process_errors: FrozenDict[int, str] | None
3✔
103
    log_output: bool
3✔
104
    capture_stdout_file: str | None
3✔
105
    capture_stderr_file: str | None
3✔
106
    workspace_invalidation_globs: PathGlobs | None
3✔
107
    cache_scope: ProcessCacheScope | None = None
3✔
108
    use_working_directory_as_base_for_output_captures: bool = True
3✔
109
    outputs_match_error_behavior: GlobMatchErrorBehavior = GlobMatchErrorBehavior.error
3✔
110
    outputs_match_conjunction: GlobExpansionConjunction | None = GlobExpansionConjunction.all_match
3✔
111

112

113
@dataclass(frozen=True)
3✔
114
class PreparedAdhocProcessRequest:
3✔
115
    original_request: AdhocProcessRequest
3✔
116
    process: Process
3✔
117

118

119
@dataclass(frozen=True)
3✔
120
class FallibleAdhocProcessResult:
3✔
121
    process_result: FallibleProcessResult
3✔
122
    adjusted_digest: Digest
3✔
123
    description: str
3✔
124

125

126
@dataclass(frozen=True)
3✔
127
class AdhocProcessResult:
3✔
128
    process_result: ProcessResult
3✔
129
    adjusted_digest: Digest
3✔
130

131

132
@dataclass(frozen=True)
3✔
133
class ResolveExecutionDependenciesRequest:
3✔
134
    address: Address
3✔
135
    execution_dependencies: tuple[str, ...] | None
3✔
136
    runnable_dependencies: tuple[str, ...] | None
3✔
137

138

139
@dataclass(frozen=True)
3✔
140
class ResolvedExecutionDependencies:
3✔
141
    digest: Digest
3✔
142
    runnable_dependencies: RunnableDependencies | None
3✔
143

144

145
@dataclass(frozen=True)
3✔
146
class RunnableDependencies:
3✔
147
    path_component: str
3✔
148
    immutable_input_digests: Mapping[str, Digest]
3✔
149
    append_only_caches: Mapping[str, str]
3✔
150
    extra_env: Mapping[str, str]
3✔
151

152

153
@dataclass(frozen=True)
3✔
154
class ToolRunnerRequest:
3✔
155
    runnable_address_str: str
3✔
156
    args: tuple[str, ...]
3✔
157
    execution_dependencies: tuple[str, ...]
3✔
158
    runnable_dependencies: tuple[str, ...]
3✔
159
    target: Target
3✔
160
    named_caches: FrozenDict[str, str] | None = None
3✔
161

162

163
@dataclass(frozen=True)
3✔
164
class ToolRunner:
3✔
165
    digest: Digest
3✔
166
    args: tuple[str, ...]
3✔
167
    extra_env: FrozenDict[str, str]
3✔
168
    extra_paths: tuple[str, ...]
3✔
169
    append_only_caches: FrozenDict[str, str]
3✔
170
    immutable_input_digests: FrozenDict[str, Digest]
3✔
171

172

173
#
174
# Things that need a home
175
#
176

177

178
@dataclass(frozen=True)
3✔
179
class ExtraSandboxContents:
3✔
180
    digest: Digest
3✔
181
    paths: tuple[str, ...]
3✔
182
    immutable_input_digests: Mapping[str, Digest]
3✔
183
    append_only_caches: Mapping[str, str]
3✔
184
    extra_env: Mapping[str, str]
3✔
185

186

187
@dataclass(frozen=True)
3✔
188
class MergeExtraSandboxContents:
3✔
189
    additions: tuple[ExtraSandboxContents, ...]
3✔
190

191

192
@rule
3✔
193
async def merge_extra_sandbox_contents(request: MergeExtraSandboxContents) -> ExtraSandboxContents:
3✔
194
    additions = request.additions
×
195

196
    digests: list[Digest] = []
×
197
    paths: list[str] = []
×
198
    immutable_input_digests: dict[str, Digest] = {}
×
199
    append_only_caches: dict[str, str] = {}
×
200
    extra_env: dict[str, str] = {}
×
201

202
    for addition in additions:
×
203
        digests.append(addition.digest)
×
204
        if addition.paths:
×
205
            paths.extend(addition.paths)
×
206
        _safe_update(immutable_input_digests, addition.immutable_input_digests)
×
207
        _safe_update(append_only_caches, addition.append_only_caches)
×
208
        _safe_update(extra_env, addition.extra_env)
×
209

210
    digest = await merge_digests(MergeDigests(digests))
×
211

212
    return ExtraSandboxContents(
×
213
        digest=digest,
214
        paths=tuple(paths),
215
        immutable_input_digests=FrozenDict(immutable_input_digests),
216
        append_only_caches=FrozenDict(append_only_caches),
217
        extra_env=FrozenDict(extra_env),
218
    )
219

220

221
#
222
# END THINGS THAT NEED A HOME
223
#
224

225

226
@rule
3✔
227
async def convert_fallible_adhoc_process_result(
3✔
228
    fallible_result: FallibleAdhocProcessResult,
229
) -> AdhocProcessResult:
230
    result = await fallible_to_exec_result_or_raise(
×
231
        **implicitly(
232
            {
233
                fallible_result.process_result: FallibleProcessResult,
234
                ProductDescription(fallible_result.description): ProductDescription,
235
            }
236
        )
237
    )
238
    return AdhocProcessResult(
×
239
        process_result=result, adjusted_digest=fallible_result.adjusted_digest
240
    )
241

242

243
async def _resolve_runnable_dependencies(
3✔
244
    bash: BashBinary, deps: tuple[str, ...] | None, owning: Address, origin: str
245
) -> tuple[Digest, RunnableDependencies | None]:
246
    if not deps:
×
247
        return EMPTY_DIGEST, None
×
248

249
    addresses = await resolve_unparsed_address_inputs(
×
250
        UnparsedAddressInputs(
251
            (dep for dep in deps),
252
            owning_address=owning,
253
            description_of_origin=origin,
254
        ),
255
        **implicitly(),
256
    )
257

258
    targets = await resolve_targets(**implicitly({addresses: Addresses}))
×
259
    fspt = await find_valid_field_sets(
×
260
        FieldSetsPerTargetRequest(RunFieldSet, targets), **implicitly()
261
    )
262

263
    for address, field_set in zip(addresses, fspt.collection):
×
264
        if not field_set:
×
265
            raise ValueError(
×
266
                dedent(
267
                    f"""\
268
                    Address `{address.spec}` was specified as a runnable dependency, but is not
269
                    runnable.
270
                    """
271
                )
272
            )
273

274
    runnables = await concurrently(
×
275
        generate_run_in_sandbox_request(**implicitly({field_set[0]: RunFieldSet}))
276
        for field_set in fspt.collection
277
    )
278

279
    shims: list[FileContent] = []
×
280
    extras: list[ExtraSandboxContents] = []
×
281

282
    for address, runnable in zip(addresses, runnables):
×
283
        extras.append(
×
284
            ExtraSandboxContents(
285
                digest=runnable.digest,
286
                paths=(),
287
                immutable_input_digests=FrozenDict(runnable.immutable_input_digests or {}),
288
                append_only_caches=FrozenDict(runnable.append_only_caches or {}),
289
                extra_env=FrozenDict(),
290
            )
291
        )
292
        shims.append(
×
293
            FileContent(
294
                address.target_name,
295
                _runnable_dependency_shim(bash.path, runnable.args, runnable.extra_env),
296
                is_executable=True,
297
            )
298
        )
299

300
    merged_extras, shim_digest = await concurrently(
×
301
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extras))),
302
        create_digest(CreateDigest(shims)),
303
    )
304

305
    shim_digest_path = f"_runnable_dependency_shims_{shim_digest.fingerprint}"
×
306
    immutable_input_digests = {shim_digest_path: shim_digest}
×
307
    _safe_update(immutable_input_digests, merged_extras.immutable_input_digests)
×
308

309
    return (
×
310
        merged_extras.digest,
311
        RunnableDependencies(
312
            shim_digest_path,
313
            FrozenDict(immutable_input_digests),
314
            merged_extras.append_only_caches,
315
            FrozenDict({"_PANTS_SHIM_ROOT": "{chroot}"}),
316
        ),
317
    )
318

319

320
@rule
3✔
321
async def resolve_execution_environment(
3✔
322
    request: ResolveExecutionDependenciesRequest,
323
    bash: BashBinary,
324
) -> ResolvedExecutionDependencies:
325
    target_address = request.address
×
326
    raw_execution_dependencies = request.execution_dependencies
×
327

328
    # Always include the execution dependencies that were specified
329
    if raw_execution_dependencies is not None:
×
330
        _descr = f"the `execution_dependencies` from the target {target_address}"
×
331
        execution_dependencies = await resolve_unparsed_address_inputs(
×
332
            UnparsedAddressInputs(
333
                raw_execution_dependencies,
334
                owning_address=target_address,
335
                description_of_origin=_descr,
336
            ),
337
            **implicitly(),
338
        )
339
    else:
340
        execution_dependencies = Addresses(())
×
341

342
    transitive = await transitive_targets(
×
343
        TransitiveTargetsRequest(execution_dependencies), **implicitly()
344
    )
345

346
    all_dependencies = (
×
347
        *(i for i in transitive.roots if i.address is not target_address),
348
        *transitive.dependencies,
349
    )
350

351
    sources, pkgs_per_target = await concurrently(
×
352
        determine_source_files(
353
            SourceFilesRequest(
354
                sources_fields=[tgt.get(SourcesField) for tgt in all_dependencies],
355
                for_sources_types=(SourcesField, FileSourceField),
356
                enable_codegen=True,
357
            )
358
        ),
359
        find_valid_field_sets(
360
            FieldSetsPerTargetRequest(PackageFieldSet, all_dependencies), **implicitly()
361
        ),
362
    )
363

364
    packages = await concurrently(
×
365
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
366
        for field_set in pkgs_per_target.field_sets
367
    )
368

369
    _descr = f"the `runnable_dependencies` from the target {target_address}"
×
370
    runnables_digest, runnable_dependencies = await _resolve_runnable_dependencies(
×
371
        bash, request.runnable_dependencies, target_address, _descr
372
    )
373

374
    dependencies_digest = await merge_digests(
×
375
        MergeDigests([sources.snapshot.digest, runnables_digest, *(pkg.digest for pkg in packages)])
376
    )
377

378
    return ResolvedExecutionDependencies(dependencies_digest, runnable_dependencies)
×
379

380

381
K = TypeVar("K")
3✔
382
V = TypeVar("V")
3✔
383

384

385
def _safe_update(d1: dict[K, V], d2: Mapping[K, V]) -> dict[K, V]:
3✔
386
    """Updates `d1` with the values from `d2`, raising an exception if a key exists in both
387
    dictionaries, but with a different value."""
388

389
    for k, v in d2.items():
×
390
        if k in d1 and d1[k] != v:
×
391
            raise ValueError(f"Key {k} was specified in both dictionaries with different values.")
×
392
        d1[k] = v
×
393
    return d1
×
394

395

396
def _runnable_dependency_shim(
3✔
397
    bash: str, args: Iterable[str], extra_env: Mapping[str, str]
398
) -> bytes:
399
    """The binary shim script to be placed in the output directory for the digest."""
400

401
    def _quote(s: str) -> str:
×
402
        quoted = shlex.quote(s)
×
403
        return quoted.replace("{chroot}", "'${_PANTS_SHIM_ROOT}'")
×
404

405
    binary = " ".join(_quote(arg) for arg in args)
×
406
    env_str = "\n".join(
×
407
        f"export {shlex.quote(key)}={_quote(value)}" for (key, value) in extra_env.items()
408
    )
409
    return dedent(
×
410
        f"""\
411
        #!{bash}
412
        {env_str}
413
        exec {binary} "$@"
414
        """
415
    ).encode()
416

417

418
@rule
3✔
419
async def create_tool_runner(
3✔
420
    request: ToolRunnerRequest,
421
) -> ToolRunner:
422
    runnable_address = await resolve_address(
×
423
        **implicitly(
424
            {
425
                AddressInput.parse(
426
                    request.runnable_address_str,
427
                    relative_to=request.target.address.spec_path,
428
                    description_of_origin=f"Runnable target for {request.target.address.spec_path}",
429
                ): AddressInput
430
            }
431
        )
432
    )
433

434
    addresses = Addresses((runnable_address,))
×
435
    addresses.expect_single()
×
436

437
    runnable_targets = await resolve_targets(**implicitly({addresses: Addresses}))
×
438

439
    run_field_sets, environment_name = await concurrently(
×
440
        find_valid_field_sets(
441
            FieldSetsPerTargetRequest(RunFieldSet, runnable_targets), **implicitly()
442
        ),
443
        resolve_environment_name(
444
            EnvironmentNameRequest.from_target(request.target), **implicitly()
445
        ),
446
    )
447

448
    req = ResolveExecutionDependenciesRequest(
×
449
        address=request.target.address,
450
        execution_dependencies=request.execution_dependencies,
451
        runnable_dependencies=request.runnable_dependencies,
452
    )
453
    execution_environment = await resolve_execution_environment(
×
454
        **implicitly({req: ResolveExecutionDependenciesRequest, environment_name: EnvironmentName}),
455
    )
456

457
    run_field_set: RunFieldSet = run_field_sets.field_sets[0]
×
458

459
    # Must be run in target environment so that the binaries/envvars match the execution
460
    # environment when we actually run the process.
461
    run_request = await generate_run_in_sandbox_request(
×
462
        **implicitly({run_field_set: RunFieldSet, environment_name: EnvironmentName})
463
    )
464

465
    dependencies_digest = execution_environment.digest
×
466
    runnable_dependencies = execution_environment.runnable_dependencies
×
467

468
    extra_env: dict[str, str] = dict(run_request.extra_env or {})
×
469
    extra_path = extra_env.pop("PATH", None)
×
470

471
    extra_sandbox_contents = []
×
472

473
    extra_sandbox_contents.append(
×
474
        ExtraSandboxContents(
475
            digest=EMPTY_DIGEST,
476
            paths=(extra_path,) if extra_path else (),
477
            immutable_input_digests=run_request.immutable_input_digests or FrozenDict(),
478
            append_only_caches=run_request.append_only_caches or FrozenDict(),
479
            extra_env=run_request.extra_env or FrozenDict(),
480
        )
481
    )
482

483
    if runnable_dependencies:
×
484
        extra_sandbox_contents.append(
×
485
            ExtraSandboxContents(
486
                digest=EMPTY_DIGEST,
487
                paths=(f"{{chroot}}/{runnable_dependencies.path_component}",),
488
                immutable_input_digests=runnable_dependencies.immutable_input_digests,
489
                append_only_caches=runnable_dependencies.append_only_caches,
490
                extra_env=runnable_dependencies.extra_env,
491
            )
492
        )
493

494
    merged_extras, main_digest = await concurrently(
×
495
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extra_sandbox_contents))),
496
        merge_digests(MergeDigests((dependencies_digest, run_request.digest))),
497
    )
498

499
    extra_env = dict(merged_extras.extra_env)
×
500

501
    append_only_caches = {
×
502
        **merged_extras.append_only_caches,
503
        **(request.named_caches or {}),  # type: ignore[dict-item]
504
    }
505

506
    return ToolRunner(
×
507
        digest=main_digest,
508
        args=run_request.args + tuple(request.args),
509
        extra_env=FrozenDict(extra_env),
510
        extra_paths=merged_extras.paths,
511
        append_only_caches=FrozenDict(append_only_caches),
512
        immutable_input_digests=FrozenDict(merged_extras.immutable_input_digests),
513
    )
514

515

516
async def check_outputs(
3✔
517
    output_digest: Digest,
518
    output_files: Iterable[str],
519
    output_directories: Iterable[str],
520
    outputs_match_error_behavior: GlobMatchErrorBehavior,
521
    outputs_match_mode: GlobExpansionConjunction | None,
522
    address: Address,
523
) -> None:
524
    """Check an output digest from adhoc/shell backends to ensure that the outputs expected by the
525
    user do in fact exist."""
526

527
    if outputs_match_mode is None:
×
528
        return
×
529

530
    filtered_output_files_digests = await concurrently(
×
531
        digest_subset_to_digest(
532
            DigestSubset(
533
                output_digest,
534
                PathGlobs(
535
                    [output_file],
536
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
537
                ),
538
            ),
539
        )
540
        for output_file in output_files
541
    )
542

543
    filtered_output_directory_digests = await concurrently(
×
544
        digest_subset_to_digest(
545
            DigestSubset(
546
                output_digest,
547
                PathGlobs(
548
                    [output_directory, os.path.join(output_directory, "**")],
549
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
550
                ),
551
            ),
552
        )
553
        for output_directory in output_directories
554
    )
555

556
    filtered_output_files = tuple(zip(output_files, filtered_output_files_digests))
×
557
    filtered_output_directories = tuple(zip(output_directories, filtered_output_directory_digests))
×
558

559
    unused_output_files = tuple(
×
560
        f"{output_file} (from `output_files` field)"
561
        for output_file, digest in filtered_output_files
562
        if digest == EMPTY_DIGEST
563
    )
564
    unused_output_directories = tuple(
×
565
        f"{output_directory} (from `output_directories` field)"
566
        for output_directory, digest in filtered_output_directories
567
        if digest == EMPTY_DIGEST
568
    )
569

570
    def warn_or_raise(message: str, snapshot: Snapshot) -> None:
×
571
        unused_globs_str = ", ".join([*unused_output_files, *unused_output_directories])
×
572
        message = f"{message}\n\nThe following output globs were unused: {unused_globs_str}"
×
573

574
        if snapshot.dirs:
×
575
            message += f"\n\nDirectories in output ({len(snapshot.dirs)} total):"
×
576
            dirs = sorted(snapshot.dirs, key=lambda x: x.count(os.pathsep))
×
577
            if len(dirs) > 15:
×
578
                message += f" {', '.join(dirs[0:15])}, ... (trimmed for brevity)"
×
579
            else:
580
                message += f" {', '.join(dirs)}"
×
581

582
        if snapshot.files:
×
583
            message += f"\n\nFiles in output ({len(snapshot.files)} total):"
×
584
            files = sorted(snapshot.files, key=lambda x: x.count(os.pathsep))
×
585
            if len(files) > 15:
×
586
                message += f" {', '.join(files[0:15])}, ... (trimmed for brevity)"
×
587
            else:
588
                message += f" {', '.join(files)}"
×
589

590
        if outputs_match_error_behavior == GlobMatchErrorBehavior.error:
×
591
            raise ValueError(message)
×
592
        else:
593
            logger.warning(message)
×
594

595
    if outputs_match_mode == GlobExpansionConjunction.all_match:
×
596
        if not unused_output_files and not unused_output_directories:
×
597
            return
×
598

599
        snapshot, wrapped_tgt = await concurrently(
×
600
            digest_to_snapshot(output_digest),
601
            resolve_target(
602
                WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()
603
            ),
604
        )
605
        warn_or_raise(
×
606
            f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `all` "
607
            "which requires all output globs to actually match an output.",
608
            snapshot,
609
        )
610

611
    # Otherwise it is `GlobExpansionConjunction.any_match` which means only at least one glob must match.
612
    total_count = len(filtered_output_files) + len(filtered_output_directories)
×
613
    unused_count = len(unused_output_files) + len(unused_output_directories)
×
614
    if total_count == 0 or unused_count < total_count:
×
615
        return
×
616
    snapshot, wrapped_tgt = await concurrently(
×
617
        digest_to_snapshot(output_digest),
618
        resolve_target(WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()),
619
    )
620
    warn_or_raise(
×
621
        f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `any` "
622
        "which requires at least one output glob to actually match an output.",
623
        snapshot,
624
    )
625

626

627
@rule
3✔
628
async def run_prepared_adhoc_process(
3✔
629
    prepared_request: PreparedAdhocProcessRequest,
630
) -> FallibleAdhocProcessResult:
631
    request = prepared_request.original_request
×
632

633
    result = await execute_process(prepared_request.process, **implicitly())
×
634

635
    log_on_errors = request.log_on_process_errors or FrozenDict()
×
636
    error_to_log = log_on_errors.get(result.exit_code, None)
×
637
    if error_to_log:
×
638
        logger.error(error_to_log)
×
639

640
    if request.log_output:
×
641
        if result.stdout:
×
642
            logger.info(result.stdout.decode())
×
643
        if result.stderr:
×
644
            logger.warning(result.stderr.decode())
×
645

646
    working_directory = parse_relative_directory(request.working_directory, request.address)
×
647

648
    root_output_directory: str | None = None
×
649
    if request.use_working_directory_as_base_for_output_captures:
×
650
        root_output_directory = parse_relative_directory(
×
651
            request.root_output_directory, working_directory
652
        )
653

654
    extras = (
×
655
        (request.capture_stdout_file, result.stdout),
656
        (request.capture_stderr_file, result.stderr),
657
    )
658
    extra_contents = {i: j for i, j in extras if i}
×
659

660
    # Check the outputs (if configured) to ensure any required glob matches in fact occurred.
661
    output_digest = result.output_digest
×
662
    output_files: list[str] = list(request.output_files)
×
663
    output_directories: list[str] = list(request.output_directories)
×
664
    if request.use_working_directory_as_base_for_output_captures:
×
665
        output_files = [
×
666
            os.path.normpath(os.path.join(working_directory, of)) for of in output_files
667
        ]
668
        output_directories = [
×
669
            os.path.normpath(os.path.join(working_directory, od)) for od in output_directories
670
        ]
671
    await check_outputs(
×
672
        output_digest=output_digest,
673
        output_files=output_files,
674
        output_directories=output_directories,
675
        outputs_match_error_behavior=request.outputs_match_error_behavior,
676
        outputs_match_mode=request.outputs_match_conjunction,
677
        address=request.address,
678
    )
679

680
    if extra_contents:
×
681
        if request.use_working_directory_as_base_for_output_captures:
×
682
            extra_digest = await create_digest(
×
683
                CreateDigest(
684
                    FileContent(_parse_relative_file(name, working_directory), content)
685
                    for name, content in extra_contents.items()
686
                )
687
            )
688
        else:
689
            extra_digest = await create_digest(
×
690
                CreateDigest(FileContent(name, content) for name, content in extra_contents.items())
691
            )
692

693
        output_digest = await merge_digests(MergeDigests((output_digest, extra_digest)))
×
694

695
    adjusted: Digest = output_digest
×
696
    if root_output_directory is not None:
×
697
        adjusted = await remove_prefix(RemovePrefix(output_digest, root_output_directory))
×
698

699
    return FallibleAdhocProcessResult(
×
700
        process_result=result,
701
        adjusted_digest=adjusted,
702
        description=request.description,
703
    )
704

705

706
# Compute a stable bytes value for a `PathMetadata` consisting of the values to be hashed.
707
# Access time is not included to avoid having mere access to a file invalidating an execution.
708
def _path_metadata_to_bytes(m: PathMetadata | None) -> bytes:
3✔
709
    if m is None:
×
710
        return b""
×
711

712
    def dt_fmt(dt: datetime | None) -> str | None:
×
713
        if dt is not None:
×
714
            return dt.isoformat()
×
715
        return None
×
716

717
    d = {
×
718
        "path": m.path,
719
        "kind": str(m.kind),
720
        "length": m.length,
721
        "is_executable": m.is_executable,
722
        "unix_mode": m.unix_mode,
723
        "created": dt_fmt(m.created),
724
        "modified": dt_fmt(m.modified),
725
        "symlink_target": m.symlink_target,
726
    }
727

728
    return json.dumps(d, sort_keys=True).encode()
×
729

730

731
async def compute_workspace_invalidation_hash(path_globs: PathGlobs) -> str:
3✔
732
    raw_paths = await path_globs_to_paths(path_globs)
×
733
    paths = sorted([*raw_paths.files, *raw_paths.dirs])
×
734
    metadata_results = await concurrently(
×
735
        path_metadata_request(PathMetadataRequest(path)) for path in paths
736
    )
737

738
    # Compute a stable hash of all of the metadatas since the hash value should be stable
739
    # when used outside the process (for example, in the cache). (The `__hash__` dunder method
740
    # computes an unstable hash which can and does vary across different process invocations.)
741
    #
742
    # While it could be more of an intellectual correctness point than a necessity, It does matter,
743
    # however, for a single user to see the same behavior across process invocations if pantsd restarts.
744
    #
745
    # Note: This could probbaly use a non-cryptographic hash (e.g., Murmur), but that would require
746
    # a third party dependency.
747
    h = hashlib.sha256()
×
748
    for mr in metadata_results:
×
749
        h.update(_path_metadata_to_bytes(mr.metadata))
×
750
    return h.hexdigest()
×
751

752

753
@rule
3✔
754
async def prepare_adhoc_process(
3✔
755
    request: AdhocProcessRequest,
756
    bash: BashBinary,
757
) -> PreparedAdhocProcessRequest:
758
    description = request.description
×
759
    address = request.address
×
760
    working_directory = parse_relative_directory(request.working_directory or "", address)
×
761
    argv = request.argv
×
762
    timeout: int | None = request.timeout
×
763
    output_files = request.output_files
×
764
    output_directories = request.output_directories
×
765
    append_only_caches = request.append_only_caches or FrozenDict()
×
766
    immutable_input_digests = request.immutable_input_digests or FrozenDict()
×
767

768
    command_env: dict[str, str] = dict(request.env_vars)
×
769

770
    # Compute the hash for any workspace invalidation sources and put the hash into the environment as a dummy variable
771
    # so that the process produced by this rule will be invalidated if any of the referenced files change.
772
    if request.workspace_invalidation_globs is not None:
×
773
        workspace_invalidation_hash = await compute_workspace_invalidation_hash(
×
774
            request.workspace_invalidation_globs
775
        )
776
        command_env["__PANTS_WORKSPACE_INVALIDATION_SOURCES_HASH"] = workspace_invalidation_hash
×
777

778
    input_snapshot = await digest_to_snapshot(request.input_digest)
×
779

780
    if not working_directory or working_directory in input_snapshot.dirs:
×
781
        # Needed to ensure that underlying filesystem does not change during run
782
        work_dir = EMPTY_DIGEST
×
783
    else:
784
        work_dir = await create_digest(CreateDigest([Directory(working_directory)]))
×
785

786
    input_digest = await merge_digests(MergeDigests([request.input_digest, work_dir]))
×
787

788
    process = Process(
×
789
        argv=argv,
790
        description=f"Running {description}",
791
        env=command_env,
792
        input_digest=input_digest,
793
        output_directories=output_directories,
794
        output_files=output_files,
795
        timeout_seconds=timeout,
796
        working_directory=working_directory,
797
        append_only_caches=append_only_caches,
798
        immutable_input_digests=immutable_input_digests,
799
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
800
    )
801

802
    if request.use_working_directory_as_base_for_output_captures:
×
803
        process = _output_at_build_root(process, bash)
×
804

805
    return PreparedAdhocProcessRequest(
×
806
        original_request=request,
807
        process=process,
808
    )
809

810

811
class PathEnvModifyMode(Enum):
3✔
812
    """How the PATH environment variable should be augmented with extra path elements."""
813

814
    PREPEND = "prepend"
3✔
815
    APPEND = "append"
3✔
816
    OFF = "off"
3✔
817

818

819
async def prepare_env_vars(
3✔
820
    existing_env_vars: Mapping[str, str],
821
    env_vars_templates: tuple[str, ...],
822
    *,
823
    extra_paths: tuple[str, ...] = (),
824
    path_env_modify_mode: PathEnvModifyMode = PathEnvModifyMode.PREPEND,
825
    description_of_origin: str,
826
) -> FrozenDict[str, str]:
827
    env_vars: dict[str, str] = dict(existing_env_vars)
×
828

829
    to_fetch: set[str] = set()
×
830
    duplicate_keys: set[str] = set()
×
831
    for env_var in env_vars_templates:
×
832
        parts = env_var.split("=", 1)
×
833
        if parts[0] in env_vars:
×
834
            duplicate_keys.add(parts[0])
×
835

836
        if len(parts) == 2:
×
837
            env_vars[parts[0]] = parts[1]
×
838
        else:
839
            to_fetch.add(parts[0])
×
840

841
    if duplicate_keys:
×
842
        dups_as_str = ", ".join(sorted(duplicate_keys))
×
843
        raise ValueError(
×
844
            f"The following environment variables referenced in {description_of_origin} are defined multiple times: {dups_as_str}"
845
        )
846

847
    if to_fetch:
×
848
        fetched_env_vars = await environment_vars_subset(
×
849
            EnvironmentVarsRequest(tuple(sorted(to_fetch))), **implicitly()
850
        )
851
        env_vars.update(fetched_env_vars)
×
852

853
    def path_env_join(left: str | None, right: str | None) -> str | None:
×
854
        if not left and not right:
×
855
            return None
×
856
        if left and not right:
×
857
            return left
×
858
        if not left and right:
×
859
            return right
×
860
        return f"{left}:{right}"
×
861

862
    if extra_paths:
×
863
        existing_path_env = env_vars.get("PATH")
×
864
        extra_paths_as_str = ":".join(extra_paths)
×
865

866
        new_path_env: str | None = None
×
867
        if path_env_modify_mode == PathEnvModifyMode.PREPEND:
×
868
            new_path_env = path_env_join(extra_paths_as_str, existing_path_env)
×
869
        elif path_env_modify_mode == PathEnvModifyMode.APPEND:
×
870
            new_path_env = path_env_join(existing_path_env, extra_paths_as_str)
×
871

872
        if new_path_env:
×
873
            env_vars["PATH"] = new_path_env
×
874

875
    return FrozenDict(env_vars)
×
876

877

878
def _output_at_build_root(process: Process, bash: BashBinary) -> Process:
3✔
879
    working_directory = process.working_directory or ""
×
880

881
    output_directories = process.output_directories
×
882
    output_files = process.output_files
×
883
    if working_directory:
×
884
        output_directories = tuple(os.path.join(working_directory, d) for d in output_directories)
×
885
        output_files = tuple(os.path.join(working_directory, d) for d in output_files)
×
886

887
    cd = f"cd {shlex.quote(working_directory)} && " if working_directory else ""
×
888
    shlexed_argv = shlex.join(process.argv)
×
889
    new_argv = (bash.path, "-c", f"{cd}{shlexed_argv}")
×
890

891
    return dataclasses.replace(
×
892
        process,
893
        argv=new_argv,
894
        working_directory=None,
895
        output_directories=output_directories,
896
        output_files=output_files,
897
    )
898

899

900
def parse_relative_directory(workdir_in: str, relative_to: Address | str) -> str:
3✔
901
    """Convert the `workdir` field into something that can be understood by `Process`."""
902

903
    if isinstance(relative_to, Address):
×
904
        reldir = relative_to.spec_path
×
905
    else:
906
        reldir = relative_to
×
907

908
    if workdir_in == ".":
×
909
        return reldir
×
910
    elif workdir_in.startswith("./"):
×
911
        return os.path.join(reldir, workdir_in[2:])
×
912
    elif workdir_in.startswith("/"):
×
913
        return workdir_in[1:]
×
914
    else:
915
        return workdir_in
×
916

917

918
def _parse_relative_file(file_in: str, relative_to: str) -> str:
3✔
919
    """Convert the `capture_std..._file` fields into something that can be understood by
920
    `Process`."""
921

922
    if file_in.startswith("/"):
×
923
        return file_in[1:]
×
924

925
    return os.path.join(relative_to, file_in)
×
926

927

928
def rules():
3✔
929
    return (
3✔
930
        *collect_rules(),
931
        *process.rules(),
932
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc