• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19529437518

20 Nov 2025 07:44AM UTC coverage: 78.884% (-1.4%) from 80.302%
19529437518

push

github

web-flow
nfpm.native_libs: Add RPM package depends from packaged pex_binaries (#22899)

## PR Series Overview

This is the second in a series of PRs that introduces a new backend:
`pants.backend.npm.native_libs`
Initially, the backend will be available as:
`pants.backend.experimental.nfpm.native_libs`

I proposed this new backend (originally named `bindeps`) in discussion
#22396.

This backend will inspect ELF bin/lib files (like `lib*.so`) in packaged
contents (for this PR series, only in `pex_binary` targets) to identify
package dependency metadata and inject that metadata on the relevant
`nfpm_deb_package` or `nfpm_rpm_package` targets. Effectively, it will
provide an approximation of these native packager features:
- `rpm`: `rpmdeps` + `elfdeps`
- `deb`: `dh_shlibdeps` + `dpkg-shlibdeps` (These substitute
`${shlibs:Depends}` in debian control files have)

### Goal: Host-agnostic package builds

This pants backend is designed to be host-agnostic, like
[nFPM](https://nfpm.goreleaser.com/).

Native packaging tools are often restricted to a single release of a
single distro. Unlike native package builders, this new pants backend
does not use any of those distro-specific or distro-release-specific
utilities or local package databases. This new backend should be able to
run (help with building deb and rpm packages) anywhere that pants can
run (MacOS, rpm linux distros, deb linux distros, other linux distros,
docker, ...).

### Previous PRs in series

- #22873

## PR Overview

This PR adds rules in `nfpm.native_libs` to add package dependency
metadata to `nfpm_rpm_package`. The 2 new rules are:

- `inject_native_libs_dependencies_in_package_fields`:

    - An implementation of the polymorphic rule `inject_nfpm_package_fields`.
      This rule is low priority (`priority = 2`) so that in-repo plugins can
      override/augment what it injects. (See #22864)

    - Rule logic overview:
        - find any pex_binaries that will be packaged in an `nfpm_rpm_package`
   ... (continued)

96 of 118 new or added lines in 3 files covered. (81.36%)

910 existing lines in 53 files now uncovered.

73897 of 93678 relevant lines covered (78.88%)

3.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.64
/src/python/pants/core/util_rules/adhoc_process_support.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
10✔
4

5
import dataclasses
10✔
6
import hashlib
10✔
7
import json
10✔
8
import logging
10✔
9
import os
10✔
10
import shlex
10✔
11
from collections.abc import Iterable, Mapping
10✔
12
from dataclasses import dataclass
10✔
13
from datetime import datetime
10✔
14
from enum import Enum
10✔
15
from textwrap import dedent  # noqa: PNT20
10✔
16
from typing import TypeVar
10✔
17

18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
10✔
19
from pants.build_graph.address import Address
10✔
20
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
10✔
21
from pants.core.goals.package import (
10✔
22
    EnvironmentAwarePackageRequest,
23
    PackageFieldSet,
24
    environment_aware_package,
25
)
26
from pants.core.goals.run import RunFieldSet, generate_run_in_sandbox_request
10✔
27
from pants.core.target_types import FileSourceField
10✔
28
from pants.core.util_rules.env_vars import environment_vars_subset
10✔
29
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
10✔
30
from pants.core.util_rules.system_binaries import BashBinary
10✔
31
from pants.engine import process
10✔
32
from pants.engine.addresses import Addresses, UnparsedAddressInputs
10✔
33
from pants.engine.env_vars import EnvironmentVarsRequest
10✔
34
from pants.engine.environment import EnvironmentName
10✔
35
from pants.engine.fs import (
10✔
36
    EMPTY_DIGEST,
37
    CreateDigest,
38
    Digest,
39
    DigestSubset,
40
    Directory,
41
    FileContent,
42
    GlobExpansionConjunction,
43
    MergeDigests,
44
    PathGlobs,
45
    PathMetadataRequest,
46
    Snapshot,
47
)
48
from pants.engine.internals.build_files import resolve_address
10✔
49
from pants.engine.internals.graph import (
10✔
50
    find_valid_field_sets,
51
    resolve_target,
52
    resolve_targets,
53
    resolve_unparsed_address_inputs,
54
    transitive_targets,
55
)
56
from pants.engine.internals.native_engine import AddressInput, PathMetadata, RemovePrefix
10✔
57
from pants.engine.intrinsics import (
10✔
58
    create_digest,
59
    digest_subset_to_digest,
60
    digest_to_snapshot,
61
    execute_process,
62
    merge_digests,
63
    path_globs_to_paths,
64
    path_metadata_request,
65
    remove_prefix,
66
)
67
from pants.engine.process import (
10✔
68
    FallibleProcessResult,
69
    Process,
70
    ProcessCacheScope,
71
    ProcessResult,
72
    ProductDescription,
73
    fallible_to_exec_result_or_raise,
74
)
75
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
10✔
76
from pants.engine.target import (
10✔
77
    FieldSetsPerTargetRequest,
78
    InvalidFieldException,
79
    SourcesField,
80
    Target,
81
    TransitiveTargetsRequest,
82
    WrappedTargetRequest,
83
)
84
from pants.util.frozendict import FrozenDict
10✔
85

86
logger = logging.getLogger(__name__)
10✔
87

88

89
@dataclass(frozen=True)
10✔
90
class AdhocProcessRequest:
10✔
91
    description: str
10✔
92
    address: Address
10✔
93
    working_directory: str
10✔
94
    root_output_directory: str
10✔
95
    argv: tuple[str, ...]
10✔
96
    timeout: int | None
10✔
97
    input_digest: Digest
10✔
98
    immutable_input_digests: FrozenDict[str, Digest] | None
10✔
99
    append_only_caches: FrozenDict[str, str] | None
10✔
100
    output_files: tuple[str, ...]
10✔
101
    output_directories: tuple[str, ...]
10✔
102
    env_vars: FrozenDict[str, str]
10✔
103
    log_on_process_errors: FrozenDict[int, str] | None
10✔
104
    log_output: bool
10✔
105
    capture_stdout_file: str | None
10✔
106
    capture_stderr_file: str | None
10✔
107
    workspace_invalidation_globs: PathGlobs | None
10✔
108
    cache_scope: ProcessCacheScope | None = None
10✔
109
    use_working_directory_as_base_for_output_captures: bool = True
10✔
110
    outputs_match_error_behavior: GlobMatchErrorBehavior = GlobMatchErrorBehavior.error
10✔
111
    outputs_match_conjunction: GlobExpansionConjunction | None = GlobExpansionConjunction.all_match
10✔
112

113

114
@dataclass(frozen=True)
10✔
115
class PreparedAdhocProcessRequest:
10✔
116
    original_request: AdhocProcessRequest
10✔
117
    process: Process
10✔
118

119

120
@dataclass(frozen=True)
10✔
121
class FallibleAdhocProcessResult:
10✔
122
    process_result: FallibleProcessResult
10✔
123
    adjusted_digest: Digest
10✔
124
    description: str
10✔
125

126

127
@dataclass(frozen=True)
10✔
128
class AdhocProcessResult:
10✔
129
    process_result: ProcessResult
10✔
130
    adjusted_digest: Digest
10✔
131

132

133
@dataclass(frozen=True)
10✔
134
class ResolveExecutionDependenciesRequest:
10✔
135
    address: Address
10✔
136
    execution_dependencies: tuple[str, ...] | None
10✔
137
    runnable_dependencies: tuple[str, ...] | None
10✔
138

139

140
@dataclass(frozen=True)
10✔
141
class ResolvedExecutionDependencies:
10✔
142
    digest: Digest
10✔
143
    runnable_dependencies: RunnableDependencies | None
10✔
144

145

146
@dataclass(frozen=True)
10✔
147
class RunnableDependencies:
10✔
148
    path_component: str
10✔
149
    immutable_input_digests: Mapping[str, Digest]
10✔
150
    append_only_caches: Mapping[str, str]
10✔
151
    extra_env: Mapping[str, str]
10✔
152

153

154
@dataclass(frozen=True)
10✔
155
class ToolRunnerRequest:
10✔
156
    runnable_address_str: str
10✔
157
    args: tuple[str, ...]
10✔
158
    execution_dependencies: tuple[str, ...]
10✔
159
    runnable_dependencies: tuple[str, ...]
10✔
160
    target: Target
10✔
161
    runnable_address_field_alias: str
10✔
162
    named_caches: FrozenDict[str, str] | None = None
10✔
163

164

165
@dataclass(frozen=True)
10✔
166
class ToolRunner:
10✔
167
    digest: Digest
10✔
168
    args: tuple[str, ...]
10✔
169
    extra_env: FrozenDict[str, str]
10✔
170
    extra_paths: tuple[str, ...]
10✔
171
    append_only_caches: FrozenDict[str, str]
10✔
172
    immutable_input_digests: FrozenDict[str, Digest]
10✔
173

174

175
#
176
# Things that need a home
177
#
178

179

180
@dataclass(frozen=True)
10✔
181
class ExtraSandboxContents:
10✔
182
    digest: Digest
10✔
183
    paths: tuple[str, ...]
10✔
184
    immutable_input_digests: Mapping[str, Digest]
10✔
185
    append_only_caches: Mapping[str, str]
10✔
186
    extra_env: Mapping[str, str]
10✔
187

188

189
@dataclass(frozen=True)
10✔
190
class MergeExtraSandboxContents:
10✔
191
    additions: tuple[ExtraSandboxContents, ...]
10✔
192

193

194
@rule
10✔
195
async def merge_extra_sandbox_contents(request: MergeExtraSandboxContents) -> ExtraSandboxContents:
10✔
196
    additions = request.additions
×
197

198
    digests: list[Digest] = []
×
199
    paths: list[str] = []
×
200
    immutable_input_digests: dict[str, Digest] = {}
×
201
    append_only_caches: dict[str, str] = {}
×
202
    extra_env: dict[str, str] = {}
×
203

204
    for addition in additions:
×
205
        digests.append(addition.digest)
×
206
        if addition.paths:
×
207
            paths.extend(addition.paths)
×
208
        _safe_update(immutable_input_digests, addition.immutable_input_digests)
×
209
        _safe_update(append_only_caches, addition.append_only_caches)
×
210
        _safe_update(extra_env, addition.extra_env)
×
211

212
    digest = await merge_digests(MergeDigests(digests))
×
213

214
    return ExtraSandboxContents(
×
215
        digest=digest,
216
        paths=tuple(paths),
217
        immutable_input_digests=FrozenDict(immutable_input_digests),
218
        append_only_caches=FrozenDict(append_only_caches),
219
        extra_env=FrozenDict(extra_env),
220
    )
221

222

223
#
224
# END THINGS THAT NEED A HOME
225
#
226

227

228
@rule
10✔
229
async def convert_fallible_adhoc_process_result(
10✔
230
    fallible_result: FallibleAdhocProcessResult,
231
) -> AdhocProcessResult:
232
    result = await fallible_to_exec_result_or_raise(
×
233
        **implicitly(
234
            {
235
                fallible_result.process_result: FallibleProcessResult,
236
                ProductDescription(fallible_result.description): ProductDescription,
237
            }
238
        )
239
    )
240
    return AdhocProcessResult(
×
241
        process_result=result, adjusted_digest=fallible_result.adjusted_digest
242
    )
243

244

245
async def _resolve_runnable_dependencies(
10✔
246
    bash: BashBinary, deps: tuple[str, ...] | None, owning: Address, origin: str
247
) -> tuple[Digest, RunnableDependencies | None]:
248
    if not deps:
×
249
        return EMPTY_DIGEST, None
×
250

251
    addresses = await resolve_unparsed_address_inputs(
×
252
        UnparsedAddressInputs(
253
            (dep for dep in deps),
254
            owning_address=owning,
255
            description_of_origin=origin,
256
        ),
257
        **implicitly(),
258
    )
259

260
    targets = await resolve_targets(**implicitly({addresses: Addresses}))
×
261
    fspt = await find_valid_field_sets(
×
262
        FieldSetsPerTargetRequest(RunFieldSet, targets), **implicitly()
263
    )
264

265
    for address, field_set in zip(addresses, fspt.collection):
×
266
        if not field_set:
×
267
            raise ValueError(
×
268
                dedent(
269
                    f"""\
270
                    Address `{address.spec}` was specified as a runnable dependency, but is not
271
                    runnable.
272
                    """
273
                )
274
            )
275

276
    runnables = await concurrently(
×
277
        generate_run_in_sandbox_request(**implicitly({field_set[0]: RunFieldSet}))
278
        for field_set in fspt.collection
279
    )
280

281
    shims: list[FileContent] = []
×
282
    extras: list[ExtraSandboxContents] = []
×
283

284
    for address, runnable in zip(addresses, runnables):
×
285
        extras.append(
×
286
            ExtraSandboxContents(
287
                digest=runnable.digest,
288
                paths=(),
289
                immutable_input_digests=FrozenDict(runnable.immutable_input_digests or {}),
290
                append_only_caches=FrozenDict(runnable.append_only_caches or {}),
291
                extra_env=FrozenDict(),
292
            )
293
        )
294
        shims.append(
×
295
            FileContent(
296
                address.target_name,
297
                _runnable_dependency_shim(bash.path, runnable.args, runnable.extra_env),
298
                is_executable=True,
299
            )
300
        )
301

302
    merged_extras, shim_digest = await concurrently(
×
303
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extras))),
304
        create_digest(CreateDigest(shims)),
305
    )
306

307
    shim_digest_path = f"_runnable_dependency_shims_{shim_digest.fingerprint}"
×
308
    immutable_input_digests = {shim_digest_path: shim_digest}
×
309
    _safe_update(immutable_input_digests, merged_extras.immutable_input_digests)
×
310

311
    return (
×
312
        merged_extras.digest,
313
        RunnableDependencies(
314
            shim_digest_path,
315
            FrozenDict(immutable_input_digests),
316
            merged_extras.append_only_caches,
317
            FrozenDict({"_PANTS_SHIM_ROOT": "{chroot}"}),
318
        ),
319
    )
320

321

322
@rule
10✔
323
async def resolve_execution_environment(
10✔
324
    request: ResolveExecutionDependenciesRequest,
325
    bash: BashBinary,
326
) -> ResolvedExecutionDependencies:
327
    target_address = request.address
×
328
    raw_execution_dependencies = request.execution_dependencies
×
329

330
    # Always include the execution dependencies that were specified
331
    if raw_execution_dependencies is not None:
×
332
        _descr = f"the `execution_dependencies` from the target {target_address}"
×
333
        execution_dependencies = await resolve_unparsed_address_inputs(
×
334
            UnparsedAddressInputs(
335
                raw_execution_dependencies,
336
                owning_address=target_address,
337
                description_of_origin=_descr,
338
            ),
339
            **implicitly(),
340
        )
341
    else:
342
        execution_dependencies = Addresses(())
×
343

344
    transitive = await transitive_targets(
×
345
        TransitiveTargetsRequest(execution_dependencies), **implicitly()
346
    )
347

348
    all_dependencies = (
×
349
        *(i for i in transitive.roots if i.address is not target_address),
350
        *transitive.dependencies,
351
    )
352

353
    sources, pkgs_per_target = await concurrently(
×
354
        determine_source_files(
355
            SourceFilesRequest(
356
                sources_fields=[tgt.get(SourcesField) for tgt in all_dependencies],
357
                for_sources_types=(SourcesField, FileSourceField),
358
                enable_codegen=True,
359
            )
360
        ),
361
        find_valid_field_sets(
362
            FieldSetsPerTargetRequest(PackageFieldSet, all_dependencies), **implicitly()
363
        ),
364
    )
365

366
    packages = await concurrently(
×
367
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
368
        for field_set in pkgs_per_target.field_sets
369
    )
370

371
    _descr = f"the `runnable_dependencies` from the target {target_address}"
×
372
    runnables_digest, runnable_dependencies = await _resolve_runnable_dependencies(
×
373
        bash, request.runnable_dependencies, target_address, _descr
374
    )
375

376
    dependencies_digest = await merge_digests(
×
377
        MergeDigests([sources.snapshot.digest, runnables_digest, *(pkg.digest for pkg in packages)])
378
    )
379

380
    return ResolvedExecutionDependencies(dependencies_digest, runnable_dependencies)
×
381

382

383
K = TypeVar("K")
10✔
384
V = TypeVar("V")
10✔
385

386

387
def _safe_update(d1: dict[K, V], d2: Mapping[K, V]) -> dict[K, V]:
10✔
388
    """Updates `d1` with the values from `d2`, raising an exception if a key exists in both
389
    dictionaries, but with a different value."""
390

391
    for k, v in d2.items():
×
392
        if k in d1 and d1[k] != v:
×
393
            raise ValueError(f"Key {k} was specified in both dictionaries with different values.")
×
394
        d1[k] = v
×
395
    return d1
×
396

397

398
def _runnable_dependency_shim(
10✔
399
    bash: str, args: Iterable[str], extra_env: Mapping[str, str]
400
) -> bytes:
401
    """The binary shim script to be placed in the output directory for the digest."""
402

403
    def _quote(s: str) -> str:
×
404
        quoted = shlex.quote(s)
×
405
        return quoted.replace("{chroot}", "'${_PANTS_SHIM_ROOT}'")
×
406

407
    binary = " ".join(_quote(arg) for arg in args)
×
408
    env_str = "\n".join(
×
409
        f"export {shlex.quote(key)}={_quote(value)}" for (key, value) in extra_env.items()
410
    )
411
    return dedent(
×
412
        f"""\
413
        #!{bash}
414
        {env_str}
415
        exec {binary} "$@"
416
        """
417
    ).encode()
418

419

420
@rule
10✔
421
async def create_tool_runner(
10✔
422
    request: ToolRunnerRequest,
423
) -> ToolRunner:
424
    runnable_address = await resolve_address(
×
425
        **implicitly(
426
            {
427
                AddressInput.parse(
428
                    request.runnable_address_str,
429
                    relative_to=request.target.address.spec_path,
430
                    description_of_origin=f"Runnable target for {request.target.address.spec_path}",
431
                ): AddressInput
432
            }
433
        )
434
    )
435

436
    addresses = Addresses((runnable_address,))
×
437
    addresses.expect_single()
×
438

439
    runnable_targets = await resolve_targets(**implicitly({addresses: Addresses}))
×
440

441
    run_field_sets, environment_name = await concurrently(
×
442
        find_valid_field_sets(
443
            FieldSetsPerTargetRequest(RunFieldSet, runnable_targets), **implicitly()
444
        ),
445
        resolve_environment_name(
446
            EnvironmentNameRequest.from_target(request.target), **implicitly()
447
        ),
448
    )
449

450
    if not run_field_sets.field_sets:
×
451
        raise InvalidFieldException(
×
452
            f"The `{request.runnable_address_field_alias}` field in target `{request.target.address}` must be set to the "
453
            f"address of a target which can be run by the `run` goal. Instead, the `{request.runnable_address_field_alias}` field is "
454
            f"set to `{request.runnable_address_str}` which refers to the `{runnable_targets[0].alias}` target "
455
            f"at `{runnable_targets[0].address}` that cannot be run by the `run` goal."
456
        )
457

458
    run_field_set: RunFieldSet = run_field_sets.field_sets[0]
×
459

460
    req = ResolveExecutionDependenciesRequest(
×
461
        address=request.target.address,
462
        execution_dependencies=request.execution_dependencies,
463
        runnable_dependencies=request.runnable_dependencies,
464
    )
465
    execution_environment = await resolve_execution_environment(
×
466
        **implicitly({req: ResolveExecutionDependenciesRequest, environment_name: EnvironmentName}),
467
    )
468

469
    # Must be run in target environment so that the binaries/envvars match the execution
470
    # environment when we actually run the process.
471
    run_request = await generate_run_in_sandbox_request(
×
472
        **implicitly({run_field_set: RunFieldSet, environment_name: EnvironmentName})
473
    )
474

475
    dependencies_digest = execution_environment.digest
×
476
    runnable_dependencies = execution_environment.runnable_dependencies
×
477

478
    extra_env: dict[str, str] = dict(run_request.extra_env or {})
×
479
    extra_path = extra_env.pop("PATH", None)
×
480

481
    extra_sandbox_contents = []
×
482

483
    extra_sandbox_contents.append(
×
484
        ExtraSandboxContents(
485
            digest=EMPTY_DIGEST,
486
            paths=(extra_path,) if extra_path else (),
487
            immutable_input_digests=run_request.immutable_input_digests or FrozenDict(),
488
            append_only_caches=run_request.append_only_caches or FrozenDict(),
489
            extra_env=run_request.extra_env or FrozenDict(),
490
        )
491
    )
492

493
    if runnable_dependencies:
×
494
        extra_sandbox_contents.append(
×
495
            ExtraSandboxContents(
496
                digest=EMPTY_DIGEST,
497
                paths=(f"{{chroot}}/{runnable_dependencies.path_component}",),
498
                immutable_input_digests=runnable_dependencies.immutable_input_digests,
499
                append_only_caches=runnable_dependencies.append_only_caches,
500
                extra_env=runnable_dependencies.extra_env,
501
            )
502
        )
503

504
    merged_extras, main_digest = await concurrently(
×
505
        merge_extra_sandbox_contents(MergeExtraSandboxContents(tuple(extra_sandbox_contents))),
506
        merge_digests(MergeDigests((dependencies_digest, run_request.digest))),
507
    )
508

509
    extra_env = dict(merged_extras.extra_env)
×
510

511
    append_only_caches = {
×
512
        **merged_extras.append_only_caches,
513
        **(request.named_caches or {}),
514
    }
515

516
    return ToolRunner(
×
517
        digest=main_digest,
518
        args=run_request.args + tuple(request.args),
519
        extra_env=FrozenDict(extra_env),
520
        extra_paths=merged_extras.paths,
521
        append_only_caches=FrozenDict(append_only_caches),
522
        immutable_input_digests=FrozenDict(merged_extras.immutable_input_digests),
523
    )
524

525

526
async def check_outputs(
10✔
527
    output_digest: Digest,
528
    output_files: Iterable[str],
529
    output_directories: Iterable[str],
530
    outputs_match_error_behavior: GlobMatchErrorBehavior,
531
    outputs_match_mode: GlobExpansionConjunction | None,
532
    address: Address,
533
) -> None:
534
    """Check an output digest from adhoc/shell backends to ensure that the outputs expected by the
535
    user do in fact exist."""
536

537
    if outputs_match_mode is None:
×
538
        return
×
539

540
    filtered_output_files_digests = await concurrently(
×
541
        digest_subset_to_digest(
542
            DigestSubset(
543
                output_digest,
544
                PathGlobs(
545
                    [output_file],
546
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
547
                ),
548
            ),
549
        )
550
        for output_file in output_files
551
    )
552

553
    filtered_output_directory_digests = await concurrently(
×
554
        digest_subset_to_digest(
555
            DigestSubset(
556
                output_digest,
557
                PathGlobs(
558
                    [output_directory, os.path.join(output_directory, "**")],
559
                    glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
560
                ),
561
            ),
562
        )
563
        for output_directory in output_directories
564
    )
565

566
    filtered_output_files = tuple(zip(output_files, filtered_output_files_digests))
×
567
    filtered_output_directories = tuple(zip(output_directories, filtered_output_directory_digests))
×
568

569
    unused_output_files = tuple(
×
570
        f"{output_file} (from `output_files` field)"
571
        for output_file, digest in filtered_output_files
572
        if digest == EMPTY_DIGEST
573
    )
574
    unused_output_directories = tuple(
×
575
        f"{output_directory} (from `output_directories` field)"
576
        for output_directory, digest in filtered_output_directories
577
        if digest == EMPTY_DIGEST
578
    )
579

580
    def warn_or_raise(message: str, snapshot: Snapshot) -> None:
×
581
        unused_globs_str = ", ".join([*unused_output_files, *unused_output_directories])
×
582
        message = f"{message}\n\nThe following output globs were unused: {unused_globs_str}"
×
583

584
        if snapshot.dirs:
×
585
            message += f"\n\nDirectories in output ({len(snapshot.dirs)} total):"
×
586
            dirs = sorted(snapshot.dirs, key=lambda x: x.count(os.pathsep))
×
587
            if len(dirs) > 15:
×
588
                message += f" {', '.join(dirs[0:15])}, ... (trimmed for brevity)"
×
589
            else:
590
                message += f" {', '.join(dirs)}"
×
591

592
        if snapshot.files:
×
593
            message += f"\n\nFiles in output ({len(snapshot.files)} total):"
×
594
            files = sorted(snapshot.files, key=lambda x: x.count(os.pathsep))
×
595
            if len(files) > 15:
×
596
                message += f" {', '.join(files[0:15])}, ... (trimmed for brevity)"
×
597
            else:
598
                message += f" {', '.join(files)}"
×
599

600
        if outputs_match_error_behavior == GlobMatchErrorBehavior.error:
×
601
            raise ValueError(message)
×
602
        else:
603
            logger.warning(message)
×
604

605
    if outputs_match_mode == GlobExpansionConjunction.all_match:
×
606
        if not unused_output_files and not unused_output_directories:
×
607
            return
×
608

609
        snapshot, wrapped_tgt = await concurrently(
×
610
            digest_to_snapshot(output_digest),
611
            resolve_target(
612
                WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()
613
            ),
614
        )
615
        warn_or_raise(
×
616
            f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `all` "
617
            "which requires all output globs to actually match an output.",
618
            snapshot,
619
        )
620

621
    # Otherwise it is `GlobExpansionConjunction.any_match` which means only at least one glob must match.
622
    total_count = len(filtered_output_files) + len(filtered_output_directories)
×
623
    unused_count = len(unused_output_files) + len(unused_output_directories)
×
624
    if total_count == 0 or unused_count < total_count:
×
625
        return
×
626
    snapshot, wrapped_tgt = await concurrently(
×
627
        digest_to_snapshot(output_digest),
628
        resolve_target(WrappedTargetRequest(address, "adhoc_process_support rule"), **implicitly()),
629
    )
630
    warn_or_raise(
×
631
        f"The `{wrapped_tgt.target.alias}` target at `{address}` is configured with `outputs_match_mode` set to `any` "
632
        "which requires at least one output glob to actually match an output.",
633
        snapshot,
634
    )
635

636

637
@rule
10✔
638
async def run_prepared_adhoc_process(
10✔
639
    prepared_request: PreparedAdhocProcessRequest,
640
) -> FallibleAdhocProcessResult:
641
    request = prepared_request.original_request
×
642

643
    result = await execute_process(prepared_request.process, **implicitly())
×
644

645
    log_on_errors = request.log_on_process_errors or FrozenDict()
×
646
    error_to_log = log_on_errors.get(result.exit_code, None)
×
647
    if error_to_log:
×
648
        logger.error(error_to_log)
×
649

650
    if request.log_output:
×
651
        if result.stdout:
×
652
            logger.info(result.stdout.decode())
×
653
        if result.stderr:
×
654
            logger.warning(result.stderr.decode())
×
655

656
    working_directory = parse_relative_directory(request.working_directory, request.address)
×
657

658
    root_output_directory: str | None = None
×
659
    if request.use_working_directory_as_base_for_output_captures:
×
660
        root_output_directory = parse_relative_directory(
×
661
            request.root_output_directory, working_directory
662
        )
663

664
    extras = (
×
665
        (request.capture_stdout_file, result.stdout),
666
        (request.capture_stderr_file, result.stderr),
667
    )
668
    extra_contents = {i: j for i, j in extras if i}
×
669

670
    # Check the outputs (if configured) to ensure any required glob matches in fact occurred.
671
    output_digest = result.output_digest
×
672
    output_files: list[str] = list(request.output_files)
×
673
    output_directories: list[str] = list(request.output_directories)
×
674
    if request.use_working_directory_as_base_for_output_captures:
×
675
        output_files = [
×
676
            os.path.normpath(os.path.join(working_directory, of)) for of in output_files
677
        ]
678
        output_directories = [
×
679
            os.path.normpath(os.path.join(working_directory, od)) for od in output_directories
680
        ]
681
    await check_outputs(
×
682
        output_digest=output_digest,
683
        output_files=output_files,
684
        output_directories=output_directories,
685
        outputs_match_error_behavior=request.outputs_match_error_behavior,
686
        outputs_match_mode=request.outputs_match_conjunction,
687
        address=request.address,
688
    )
689

690
    if extra_contents:
×
691
        if request.use_working_directory_as_base_for_output_captures:
×
692
            extra_digest = await create_digest(
×
693
                CreateDigest(
694
                    FileContent(_parse_relative_file(name, working_directory), content)
695
                    for name, content in extra_contents.items()
696
                )
697
            )
698
        else:
699
            extra_digest = await create_digest(
×
700
                CreateDigest(FileContent(name, content) for name, content in extra_contents.items())
701
            )
702

703
        output_digest = await merge_digests(MergeDigests((output_digest, extra_digest)))
×
704

705
    adjusted: Digest = output_digest
×
706
    if root_output_directory is not None:
×
707
        adjusted = await remove_prefix(RemovePrefix(output_digest, root_output_directory))
×
708

709
    return FallibleAdhocProcessResult(
×
710
        process_result=result,
711
        adjusted_digest=adjusted,
712
        description=request.description,
713
    )
714

715

716
# Compute a stable bytes value for a `PathMetadata` consisting of the values to be hashed.
717
# Access time is not included to avoid having mere access to a file invalidating an execution.
718
def _path_metadata_to_bytes(m: PathMetadata | None) -> bytes:
10✔
UNCOV
719
    if m is None:
×
UNCOV
720
        return b""
×
721

UNCOV
722
    def dt_fmt(dt: datetime | None) -> str | None:
×
UNCOV
723
        if dt is not None:
×
UNCOV
724
            return dt.isoformat()
×
725
        return None
×
726

UNCOV
727
    d = {
×
728
        "path": m.path,
729
        "kind": str(m.kind),
730
        "length": m.length,
731
        "is_executable": m.is_executable,
732
        "unix_mode": m.unix_mode,
733
        "created": dt_fmt(m.created),
734
        "modified": dt_fmt(m.modified),
735
        "symlink_target": m.symlink_target,
736
    }
737

UNCOV
738
    return json.dumps(d, sort_keys=True).encode()
×
739

740

741
async def compute_workspace_invalidation_hash(path_globs: PathGlobs) -> str:
10✔
742
    raw_paths = await path_globs_to_paths(path_globs)
×
743
    paths = sorted([*raw_paths.files, *raw_paths.dirs])
×
744
    metadata_results = await concurrently(
×
745
        path_metadata_request(PathMetadataRequest(path)) for path in paths
746
    )
747

748
    # Compute a stable hash of all of the metadatas since the hash value should be stable
749
    # when used outside the process (for example, in the cache). (The `__hash__` dunder method
750
    # computes an unstable hash which can and does vary across different process invocations.)
751
    #
752
    # While it could be more of an intellectual correctness point than a necessity, It does matter,
753
    # however, for a single user to see the same behavior across process invocations if pantsd restarts.
754
    #
755
    # Note: This could probbaly use a non-cryptographic hash (e.g., Murmur), but that would require
756
    # a third party dependency.
757
    h = hashlib.sha256()
×
758
    for mr in metadata_results:
×
759
        h.update(_path_metadata_to_bytes(mr.metadata))
×
760
    return h.hexdigest()
×
761

762

763
@rule
10✔
764
async def prepare_adhoc_process(
10✔
765
    request: AdhocProcessRequest,
766
    bash: BashBinary,
767
) -> PreparedAdhocProcessRequest:
768
    description = request.description
×
769
    address = request.address
×
770
    working_directory = parse_relative_directory(request.working_directory or "", address)
×
771
    argv = request.argv
×
772
    timeout: int | None = request.timeout
×
773
    output_files = request.output_files
×
774
    output_directories = request.output_directories
×
775
    append_only_caches = request.append_only_caches or FrozenDict()
×
776
    immutable_input_digests = request.immutable_input_digests or FrozenDict()
×
777

778
    command_env: dict[str, str] = dict(request.env_vars)
×
779

780
    # Compute the hash for any workspace invalidation sources and put the hash into the environment as a dummy variable
781
    # so that the process produced by this rule will be invalidated if any of the referenced files change.
782
    if request.workspace_invalidation_globs is not None:
×
783
        workspace_invalidation_hash = await compute_workspace_invalidation_hash(
×
784
            request.workspace_invalidation_globs
785
        )
786
        command_env["__PANTS_WORKSPACE_INVALIDATION_SOURCES_HASH"] = workspace_invalidation_hash
×
787

788
    input_snapshot = await digest_to_snapshot(request.input_digest)
×
789

790
    if not working_directory or working_directory in input_snapshot.dirs:
×
791
        # Needed to ensure that underlying filesystem does not change during run
792
        work_dir = EMPTY_DIGEST
×
793
    else:
794
        work_dir = await create_digest(CreateDigest([Directory(working_directory)]))
×
795

796
    input_digest = await merge_digests(MergeDigests([request.input_digest, work_dir]))
×
797

798
    process = Process(
×
799
        argv=argv,
800
        description=f"Running {description}",
801
        env=command_env,
802
        input_digest=input_digest,
803
        output_directories=output_directories,
804
        output_files=output_files,
805
        timeout_seconds=timeout,
806
        working_directory=working_directory,
807
        append_only_caches=append_only_caches,
808
        immutable_input_digests=immutable_input_digests,
809
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
810
    )
811

812
    if request.use_working_directory_as_base_for_output_captures:
×
813
        process = _output_at_build_root(process, bash)
×
814

815
    return PreparedAdhocProcessRequest(
×
816
        original_request=request,
817
        process=process,
818
    )
819

820

821
class PathEnvModifyMode(Enum):
10✔
822
    """How the PATH environment variable should be augmented with extra path elements."""
823

824
    PREPEND = "prepend"
10✔
825
    APPEND = "append"
10✔
826
    OFF = "off"
10✔
827

828

829
async def prepare_env_vars(
10✔
830
    existing_env_vars: Mapping[str, str],
831
    env_vars_templates: tuple[str, ...],
832
    *,
833
    extra_paths: tuple[str, ...] = (),
834
    path_env_modify_mode: PathEnvModifyMode = PathEnvModifyMode.PREPEND,
835
    description_of_origin: str,
836
) -> FrozenDict[str, str]:
837
    env_vars: dict[str, str] = dict(existing_env_vars)
×
838

839
    to_fetch: set[str] = set()
×
840
    duplicate_keys: set[str] = set()
×
841
    for env_var in env_vars_templates:
×
842
        parts = env_var.split("=", 1)
×
843
        if parts[0] in env_vars:
×
844
            duplicate_keys.add(parts[0])
×
845

846
        if len(parts) == 2:
×
847
            env_vars[parts[0]] = parts[1]
×
848
        else:
849
            to_fetch.add(parts[0])
×
850

851
    if duplicate_keys:
×
852
        dups_as_str = ", ".join(sorted(duplicate_keys))
×
853
        raise ValueError(
×
854
            f"The following environment variables referenced in {description_of_origin} are defined multiple times: {dups_as_str}"
855
        )
856

857
    if to_fetch:
×
858
        fetched_env_vars = await environment_vars_subset(
×
859
            EnvironmentVarsRequest(tuple(sorted(to_fetch))), **implicitly()
860
        )
861
        env_vars.update(fetched_env_vars)
×
862

863
    def path_env_join(left: str | None, right: str | None) -> str | None:
×
864
        if not left and not right:
×
865
            return None
×
866
        if left and not right:
×
867
            return left
×
868
        if not left and right:
×
869
            return right
×
870
        return f"{left}:{right}"
×
871

872
    if extra_paths:
×
873
        existing_path_env = env_vars.get("PATH")
×
874
        extra_paths_as_str = ":".join(extra_paths)
×
875

876
        new_path_env: str | None = None
×
877
        if path_env_modify_mode == PathEnvModifyMode.PREPEND:
×
878
            new_path_env = path_env_join(extra_paths_as_str, existing_path_env)
×
879
        elif path_env_modify_mode == PathEnvModifyMode.APPEND:
×
880
            new_path_env = path_env_join(existing_path_env, extra_paths_as_str)
×
881

882
        if new_path_env:
×
883
            env_vars["PATH"] = new_path_env
×
884

885
    return FrozenDict(env_vars)
×
886

887

888
def _output_at_build_root(process: Process, bash: BashBinary) -> Process:
10✔
889
    working_directory = process.working_directory or ""
×
890

891
    output_directories = process.output_directories
×
892
    output_files = process.output_files
×
893
    if working_directory:
×
894
        output_directories = tuple(os.path.join(working_directory, d) for d in output_directories)
×
895
        output_files = tuple(os.path.join(working_directory, d) for d in output_files)
×
896

897
    cd = f"cd {shlex.quote(working_directory)} && " if working_directory else ""
×
898
    shlexed_argv = shlex.join(process.argv)
×
899
    new_argv = (bash.path, "-c", f"{cd}{shlexed_argv}")
×
900

901
    return dataclasses.replace(
×
902
        process,
903
        argv=new_argv,
904
        working_directory=None,
905
        output_directories=output_directories,
906
        output_files=output_files,
907
    )
908

909

910
def parse_relative_directory(workdir_in: str, relative_to: Address | str) -> str:
10✔
911
    """Convert the `workdir` field into something that can be understood by `Process`."""
912

913
    if isinstance(relative_to, Address):
×
914
        reldir = relative_to.spec_path
×
915
    else:
916
        reldir = relative_to
×
917

918
    if workdir_in == ".":
×
919
        return reldir
×
920
    elif workdir_in.startswith("./"):
×
921
        return os.path.join(reldir, workdir_in[2:])
×
922
    elif workdir_in.startswith("/"):
×
923
        return workdir_in[1:]
×
924
    else:
925
        return workdir_in
×
926

927

928
def _parse_relative_file(file_in: str, relative_to: str) -> str:
10✔
929
    """Convert the `capture_std..._file` fields into something that can be understood by
930
    `Process`."""
931

932
    if file_in.startswith("/"):
×
933
        return file_in[1:]
×
934

935
    return os.path.join(relative_to, file_in)
×
936

937

938
def rules():
10✔
939
    return (
7✔
940
        *collect_rules(),
941
        *process.rules(),
942
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc