• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26080722777

19 May 2026 06:37AM UTC coverage: 52.106% (-11.5%) from 63.597%
26080722777

Pull #23250

github

web-flow
Merge 63ec06323 into 2693df832
Pull Request #23250: Feature: Add generic option to docker image

12 of 50 new or added lines in 3 files covered. (24.0%)

5382 existing lines in 201 files now uncovered.

32053 of 61515 relevant lines covered (52.11%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.68
/src/python/pants/core/util_rules/system_binaries.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import dataclasses
2✔
7
import hashlib
2✔
8
import logging
2✔
9
import os
2✔
10
import shlex
2✔
11
import subprocess
2✔
12
from collections.abc import Iterable, Mapping, Sequence
2✔
13
from dataclasses import dataclass
2✔
14
from enum import Enum
2✔
15
from itertools import groupby
2✔
16
from textwrap import dedent  # noqa: PNT20
2✔
17
from typing import Self
2✔
18

19
from pants.core.environments.target_types import EnvironmentTarget
2✔
20
from pants.core.subsystems import python_bootstrap
2✔
21
from pants.engine.collection import DeduplicatedCollection
2✔
22
from pants.engine.engine_aware import EngineAwareReturnType
2✔
23
from pants.engine.fs import CreateDigest, FileContent, PathMetadataRequest
2✔
24
from pants.engine.internals.native_engine import Digest, PathMetadataKind, PathNamespace
2✔
25
from pants.engine.internals.selectors import concurrently
2✔
26
from pants.engine.intrinsics import create_digest, execute_process, path_metadata_request
2✔
27
from pants.engine.platform import Platform
2✔
28
from pants.engine.process import Process, execute_process_or_raise
2✔
29
from pants.engine.rules import collect_rules, implicitly, rule
2✔
30
from pants.option.option_types import StrListOption
2✔
31
from pants.option.subsystem import Subsystem
2✔
32
from pants.util.frozendict import FrozenDict
2✔
33
from pants.util.logging import LogLevel
2✔
34
from pants.util.memo import memoized_property
2✔
35
from pants.util.ordered_set import OrderedSet
2✔
36
from pants.util.strutil import pluralize, softwrap
2✔
37

38
logger = logging.getLogger(__name__)
2✔
39

40

41
class SystemBinariesSubsystem(Subsystem):
2✔
42
    options_scope = "system-binaries"
2✔
43
    help = "System binaries related settings."
2✔
44

45
    class EnvironmentAware(Subsystem.EnvironmentAware):
2✔
46
        env_vars_used_by_options = ("PATH",)
2✔
47

48
        _DEFAULT_SEARCH_PATHS = ("/usr/bin", "/bin", "/usr/local/bin", "/opt/homebrew/bin")
2✔
49

50
        _system_binary_paths = StrListOption(
2✔
51
            default=[*_DEFAULT_SEARCH_PATHS],
52
            help=softwrap(
53
                """
54
                The PATH value that will searched for executables.
55

56
                The special string `"<PATH>"` will expand to the contents of the PATH env var.
57
                """
58
            ),
59
        )
60

61
        @memoized_property
2✔
62
        def system_binary_paths(self) -> SearchPath:
2✔
63
            def iter_path_entries():
2✔
64
                for entry in self._system_binary_paths:
2✔
65
                    if entry == "<PATH>":
2✔
66
                        path = self._options_env.get("PATH")
×
67
                        if path:
×
68
                            for prefix in path.split(os.pathsep):
×
69
                                if prefix.startswith("$") or prefix.startswith("~"):
×
70
                                    logger.warning(
×
71
                                        f"Ignored unexpanded path prefix `{prefix}` in the `PATH` environment variable while processing the `<PATH>` marker from the `[system-binaries].system_binary_paths` option. Please check the value of the `PATH` environment variable in your shell."
72
                                    )
73
                                else:
74
                                    yield prefix
×
75
                    else:
76
                        yield entry
2✔
77

78
            return SearchPath(iter_path_entries())
2✔
79

80

81
# -------------------------------------------------------------------------------------------
82
# `BinaryPath` types
83
# -------------------------------------------------------------------------------------------
84

85

86
@dataclass(frozen=True)
2✔
87
class BinaryPath:
2✔
88
    path: str
2✔
89
    fingerprint: str
2✔
90

91
    def __init__(self, path: str, fingerprint: str | None = None) -> None:
2✔
92
        object.__setattr__(self, "path", path)
2✔
93
        object.__setattr__(
2✔
94
            self, "fingerprint", self._fingerprint() if fingerprint is None else fingerprint
95
        )
96

97
    @staticmethod
2✔
98
    def _fingerprint(content: bytes | bytearray | memoryview | None = None) -> str:
2✔
99
        hasher = hashlib.sha256() if content is None else hashlib.sha256(content)
2✔
100
        return hasher.hexdigest()
2✔
101

102
    @classmethod
2✔
103
    def fingerprinted(
2✔
104
        cls, path: str, representative_content: bytes | bytearray | memoryview
105
    ) -> Self:
106
        return cls(path, fingerprint=cls._fingerprint(representative_content))
2✔
107

108

109
@dataclass(unsafe_hash=True)
2✔
110
class BinaryPathTest:
2✔
111
    args: tuple[str, ...]
2✔
112
    fingerprint_stdout: bool
2✔
113

114
    def __init__(self, args: Iterable[str], fingerprint_stdout: bool = True) -> None:
2✔
115
        object.__setattr__(self, "args", tuple(args))
2✔
116
        object.__setattr__(self, "fingerprint_stdout", fingerprint_stdout)
2✔
117

118

119
class SearchPath(DeduplicatedCollection[str]):
2✔
120
    """The search path for binaries; i.e.: the $PATH."""
121

122

123
@dataclass(unsafe_hash=True)
2✔
124
class BinaryPathRequest:
2✔
125
    """Request to find a binary of a given name.
126

127
    If `check_file_entries` is `True` a BinaryPathRequest will consider any entries in the
128
    `search_path` that are file paths in addition to traditional directory paths.
129

130
    If a `test` is specified all binaries that are found will be executed with the test args and
131
    only those binaries whose test executions exit with return code 0 will be retained.
132
    Additionally, if test execution includes stdout content, that will be used to fingerprint the
133
    binary path so that upgrades and downgrades can be detected. A reasonable test for many programs
134
    might be `BinaryPathTest(args=["--version"])` since it will both ensure the program runs and
135
    also produce stdout text that changes upon upgrade or downgrade of the binary at the discovered
136
    path.
137
    """
138

139
    search_path: SearchPath
2✔
140
    binary_name: str
2✔
141
    check_file_entries: bool
2✔
142
    test: BinaryPathTest | None
2✔
143

144
    def __init__(
2✔
145
        self,
146
        *,
147
        search_path: Iterable[str],
148
        binary_name: str,
149
        check_file_entries: bool = False,
150
        test: BinaryPathTest | None = None,
151
    ) -> None:
152
        object.__setattr__(self, "search_path", SearchPath(search_path))
2✔
153
        object.__setattr__(self, "binary_name", binary_name)
2✔
154
        object.__setattr__(self, "check_file_entries", check_file_entries)
2✔
155
        object.__setattr__(self, "test", test)
2✔
156

157

158
@dataclass(frozen=True)
2✔
159
class BinaryPaths(EngineAwareReturnType):
2✔
160
    binary_name: str
2✔
161
    paths: tuple[BinaryPath, ...]
2✔
162

163
    def __init__(self, binary_name: str, paths: Iterable[BinaryPath] | None = None):
2✔
164
        object.__setattr__(self, "binary_name", binary_name)
2✔
165
        object.__setattr__(self, "paths", tuple(OrderedSet(paths) if paths else ()))
2✔
166

167
    def message(self) -> str:
2✔
168
        if not self.paths:
×
169
            return f"failed to find {self.binary_name}"
×
170
        found_msg = f"found {self.binary_name} at {self.paths[0]}"
×
171
        if len(self.paths) > 1:
×
172
            found_msg = f"{found_msg} and {pluralize(len(self.paths) - 1, 'other location')}"
×
173
        return found_msg
×
174

175
    @property
2✔
176
    def first_path(self) -> BinaryPath | None:
2✔
177
        """Return the first path to the binary that was discovered, if any."""
178
        return next(iter(self.paths), None)
2✔
179

180
    def first_path_or_raise(self, request: BinaryPathRequest, *, rationale: str) -> BinaryPath:
2✔
181
        """Return the first path to the binary that was discovered, if any."""
182
        first_path = self.first_path
2✔
183
        if not first_path:
2✔
UNCOV
184
            raise BinaryNotFoundError.from_request(request, rationale=rationale)
×
185
        return first_path
2✔
186

187

188
class BinaryNotFoundError(EnvironmentError):
2✔
189
    @classmethod
2✔
190
    def from_request(
2✔
191
        cls,
192
        request: BinaryPathRequest,
193
        *,
194
        rationale: str | None = None,
195
        alternative_solution: str | None = None,
196
    ) -> BinaryNotFoundError:
197
        """When no binary is found via `BinaryPaths`, and it is not recoverable.
198

199
        :param rationale: A short description of why this binary is needed, e.g.
200
            "download the tools Pants needs" or "run Python programs".
201
        :param alternative_solution: A description of what else users can do to fix the issue,
202
            beyond installing the program. For example, "Alternatively, you can set the option
203
            `--python-bootstrap-search-path` to change the paths searched."
204
        """
UNCOV
205
        msg = softwrap(
×
206
            f"""
207
            Cannot find `{request.binary_name}` on `{sorted(request.search_path)}`.
208
            Please ensure that it is installed
209
            """
210
        )
UNCOV
211
        msg += f" so that Pants can {rationale}." if rationale else "."
×
UNCOV
212
        if alternative_solution:
×
213
            msg += f"\n\n{alternative_solution}"
×
UNCOV
214
        return BinaryNotFoundError(msg)
×
215

216

217
# -------------------------------------------------------------------------------------------
218
# Binary shims
219
# Creates a Digest with a shim for each requested binary in a directory suitable for PATH.
220
# -------------------------------------------------------------------------------------------
221

222

223
@dataclass(frozen=True)
2✔
224
class BinaryShimsRequest:
2✔
225
    """Request to create shims for one or more system binaries."""
226

227
    rationale: str = dataclasses.field(compare=False)
2✔
228

229
    # Create shims for provided binary paths
230
    paths: tuple[BinaryPath, ...] = tuple()
2✔
231

232
    # Create shims for the provided binary names after looking up the paths.
233
    requests: tuple[BinaryPathRequest, ...] = tuple()
2✔
234

235
    @classmethod
2✔
236
    def for_binaries(
2✔
237
        cls, *names: str, rationale: str, search_path: Sequence[str]
238
    ) -> BinaryShimsRequest:
239
        return cls(
2✔
240
            requests=tuple(
241
                BinaryPathRequest(binary_name=binary_name, search_path=search_path)
242
                for binary_name in names
243
            ),
244
            rationale=rationale,
245
        )
246

247
    @classmethod
2✔
248
    def for_paths(
2✔
249
        cls,
250
        *paths: BinaryPath,
251
        rationale: str,
252
    ) -> BinaryShimsRequest:
253
        # Remove any duplicates (which may result if the caller merges `BinaryPath` instances from multiple sources)
254
        # and also sort to ensure a stable order for better caching.
255
        paths = tuple(sorted(set(paths), key=lambda bp: bp.path))
2✔
256

257
        # Then ensure that there are no duplicate paths with mismatched content.
258
        duplicate_paths = set()
2✔
259
        for path, group in groupby(paths, key=lambda x: x.path):
2✔
260
            if len(list(group)) > 1:
2✔
261
                duplicate_paths.add(path)
×
262
        if duplicate_paths:
2✔
263
            raise ValueError(
×
264
                "Detected duplicate paths with mismatched content at paths: "
265
                f"{', '.join(sorted(duplicate_paths))}"
266
            )
267

268
        return cls(
2✔
269
            paths=paths,
270
            rationale=rationale,
271
        )
272

273

274
@dataclass(frozen=True)
2✔
275
class BinaryShims:
2✔
276
    """The shims created for a BinaryShimsRequest are placed in `bin_directory` of the `digest`.
277

278
    The purpose of these shims is so that a Process may be executed with `immutable_input_digests`
279
    provided to the `Process`, and `path_component` included in its `PATH` environment variable.
280

281
    The alternative is to add the directories hosting the binaries directly, but that opens up for
282
    many more unrelated binaries to also be executable from PATH, leaking into the sandbox
283
    unnecessarily.
284
    """
285

286
    digest: Digest
2✔
287
    cache_name: str
2✔
288

289
    @property
2✔
290
    def immutable_input_digests(self) -> Mapping[str, Digest]:
2✔
291
        return FrozenDict({self.cache_name: self.digest})
2✔
292

293
    @property
2✔
294
    def path_component(self) -> str:
2✔
295
        return os.path.join("{chroot}", self.cache_name)
2✔
296

297

298
# -------------------------------------------------------------------------------------------
299
# Binaries
300
# -------------------------------------------------------------------------------------------
301

302

303
class BashBinary(BinaryPath):
2✔
304
    """The `bash` binary."""
305

306
    DEFAULT_SEARCH_PATH = SearchPath(("/usr/bin", "/bin", "/usr/local/bin"))
2✔
307

308

309
class RealpathBinary(BinaryPath):
2✔
310
    pass
2✔
311

312

313
# Note that updating this will impact the `archive` target defined in `core/target_types.py`.
314
class ArchiveFormat(Enum):
2✔
315
    TAR = "tar"
2✔
316
    TGZ = "tar.gz"
2✔
317
    TBZ2 = "tar.bz2"
2✔
318
    TXZ = "tar.xz"
2✔
319
    ZIP = "zip"
2✔
320

321

322
class ZipBinary(BinaryPath):
2✔
323
    pass
2✔
324

325

326
class UnzipBinary(BinaryPath):
2✔
327
    def extract_archive_argv(self, archive_path: str, extract_path: str) -> tuple[str, ...]:
2✔
328
        # Note that the `output_dir` does not need to already exist.
329
        # The caller should validate that it's a valid `.zip` file.
330
        return (self.path, archive_path, "-d", extract_path)
2✔
331

332

333
@dataclass(frozen=True)
2✔
334
class TarBinary(BinaryPath):
2✔
335
    platform: Platform
2✔
336

337
    def create_archive_argv(
2✔
338
        self,
339
        output_filename: str,
340
        tar_format: ArchiveFormat,
341
        *,
342
        input_files: Sequence[str] = (),
343
        input_file_list_filename: str | None = None,
344
    ) -> tuple[str, ...]:
345
        # Note that the parent directory for the output_filename must already exist.
346
        #
347
        # We do not use `-a` (auto-set compression) because it does not work with older tar
348
        # versions. Not all tar implementations will support these compression formats - in that
349
        # case, the user will need to choose a different format.
350
        compression = {ArchiveFormat.TGZ: "z", ArchiveFormat.TBZ2: "j", ArchiveFormat.TXZ: "J"}.get(
×
351
            tar_format, ""
352
        )
353

354
        files_from = ("--files-from", input_file_list_filename) if input_file_list_filename else ()
×
355
        return (self.path, f"c{compression}f", output_filename, *input_files) + files_from
×
356

357
    def extract_archive_argv(
2✔
358
        self, archive_path: str, extract_path: str, *, archive_suffix: str
359
    ) -> tuple[str, ...]:
360
        # Note that the `output_dir` must already exist.
361
        # The caller should validate that it's a valid `.tar` file.
362
        prog_args = (
2✔
363
            ("-Ilz4",) if archive_suffix == ".tar.lz4" and not self.platform.is_macos else ()
364
        )
365
        return (self.path, *prog_args, "-xf", archive_path, "-C", extract_path)
2✔
366

367

368
class AwkBinary(BinaryPath):
2✔
369
    pass
2✔
370

371

372
class Base64Binary(BinaryPath):
2✔
373
    pass
2✔
374

375

376
class BasenameBinary(BinaryPath):
2✔
377
    pass
2✔
378

379

380
class Bzip2Binary(BinaryPath):
2✔
381
    pass
2✔
382

383

384
class Bzip3Binary(BinaryPath):
2✔
385
    pass
2✔
386

387

388
class CatBinary(BinaryPath):
2✔
389
    pass
2✔
390

391

392
class ChmodBinary(BinaryPath):
2✔
393
    pass
2✔
394

395

396
class CksumBinary(BinaryPath):
2✔
397
    pass
2✔
398

399

400
class CpBinary(BinaryPath):
2✔
401
    pass
2✔
402

403

404
class CutBinary(BinaryPath):
2✔
405
    pass
2✔
406

407

408
class DateBinary(BinaryPath):
2✔
409
    pass
2✔
410

411

412
class DdBinary(BinaryPath):
2✔
413
    pass
2✔
414

415

416
class DfBinary(BinaryPath):
2✔
417
    pass
2✔
418

419

420
class DiffBinary(BinaryPath):
2✔
421
    pass
2✔
422

423

424
class DirnameBinary(BinaryPath):
2✔
425
    pass
2✔
426

427

428
class DuBinary(BinaryPath):
2✔
429
    pass
2✔
430

431

432
class ExprBinary(BinaryPath):
2✔
433
    pass
2✔
434

435

436
class FindBinary(BinaryPath):
2✔
437
    pass
2✔
438

439

440
class GetentBinary(BinaryPath):
2✔
441
    pass
2✔
442

443

444
class GpgBinary(BinaryPath):
2✔
445
    pass
2✔
446

447

448
class GzipBinary(BinaryPath):
2✔
449
    pass
2✔
450

451

452
class HeadBinary(BinaryPath):
2✔
453
    pass
2✔
454

455

456
class IdBinary(BinaryPath):
2✔
457
    pass
2✔
458

459

460
class LnBinary(BinaryPath):
2✔
461
    pass
2✔
462

463

464
class Lz4Binary(BinaryPath):
2✔
465
    pass
2✔
466

467

468
class LzopBinary(BinaryPath):
2✔
469
    pass
2✔
470

471

472
class Md5sumBinary(BinaryPath):
2✔
473
    pass
2✔
474

475

476
class MkdirBinary(BinaryPath):
2✔
477
    pass
2✔
478

479

480
class MktempBinary(BinaryPath):
2✔
481
    pass
2✔
482

483

484
class MvBinary(BinaryPath):
2✔
485
    pass
2✔
486

487

488
class OpenBinary(BinaryPath):
2✔
489
    pass
2✔
490

491

492
class PwdBinary(BinaryPath):
2✔
493
    pass
2✔
494

495

496
class ReadlinkBinary(BinaryPath):
2✔
497
    pass
2✔
498

499

500
class RmBinary(BinaryPath):
2✔
501
    pass
2✔
502

503

504
class SedBinary(BinaryPath):
2✔
505
    pass
2✔
506

507

508
class ShBinary(BinaryPath):
2✔
509
    pass
2✔
510

511

512
class ShasumBinary(BinaryPath):
2✔
513
    pass
2✔
514

515

516
class SortBinary(BinaryPath):
2✔
517
    pass
2✔
518

519

520
class TailBinary(BinaryPath):
2✔
521
    pass
2✔
522

523

524
class TestBinary(BinaryPath):
2✔
525
    pass
2✔
526

527

528
class TouchBinary(BinaryPath):
2✔
529
    pass
2✔
530

531

532
class TrBinary(BinaryPath):
2✔
533
    pass
2✔
534

535

536
class WcBinary(BinaryPath):
2✔
537
    pass
2✔
538

539

540
class XargsBinary(BinaryPath):
2✔
541
    pass
2✔
542

543

544
class XzBinary(BinaryPath):
2✔
545
    pass
2✔
546

547

548
class ZstdBinary(BinaryPath):
2✔
549
    pass
2✔
550

551

552
class GitBinaryException(Exception):
2✔
553
    pass
2✔
554

555

556
class GitBinary(BinaryPath):
2✔
557
    def _invoke_unsandboxed(self, cmd: list[str]) -> bytes:
2✔
558
        """Invoke the given git command, _without_ the sandboxing provided by the `Process` API.
559

560
        This API is for internal use only: users should prefer to consume methods of the
561
        `GitWorktree` class.
562
        """
563
        cmd = [self.path, *cmd]
×
564

565
        self._log_call(cmd)
×
566

567
        try:
×
568
            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
×
569
        except OSError as e:
×
570
            # Binary DNE or is not executable
571
            cmd_str = shlex.join(cmd)
×
572
            raise GitBinaryException(f"Failed to execute command {cmd_str}: {e!r}")
×
573
        out, err = process.communicate()
×
574

575
        self._check_result(cmd, process.returncode, err.decode())
×
576

577
        return out.strip()
×
578

579
    def _check_result(self, cmd: list[str], result: int, failure_msg: str | None = None) -> None:
2✔
580
        # git diff --exit-code exits with 1 if there were differences.
581
        if result != 0 and (result != 1 or "diff" not in cmd):
×
582
            cmd_str = shlex.join(cmd)
×
583
            raise GitBinaryException(failure_msg or f"{cmd_str} failed with exit code {result}")
×
584

585
    def _log_call(self, cmd: Iterable[str]) -> None:
2✔
586
        logger.debug("Executing: " + " ".join(cmd))
×
587

588

589
# -------------------------------------------------------------------------------------------
590
# Binaries Rules
591
# -------------------------------------------------------------------------------------------
592

593

594
@rule(desc="Finding the `bash` binary", level=LogLevel.DEBUG)
2✔
595
async def get_bash(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> BashBinary:
2✔
596
    search_path = system_binaries.system_binary_paths
2✔
597

598
    request = BinaryPathRequest(
2✔
599
        binary_name="bash",
600
        search_path=search_path,
601
        test=BinaryPathTest(args=["--version"]),
602
    )
603
    paths = await find_binary(request, **implicitly())
2✔
604
    first_path = paths.first_path
2✔
605
    if not first_path:
2✔
606
        raise BinaryNotFoundError.from_request(request)
×
607
    return BashBinary(first_path.path, first_path.fingerprint)
2✔
608

609

610
async def _find_candidate_paths_via_path_metadata_lookups(
2✔
611
    request: BinaryPathRequest,
612
) -> tuple[str, ...]:
613
    search_path = [os.path.abspath(path) for path in request.search_path]
2✔
614

615
    metadata_results = await concurrently(
2✔
616
        path_metadata_request(
617
            PathMetadataRequest(path=path, namespace=PathNamespace.SYSTEM, follow_symlinks=True)
618
        )
619
        for path in search_path
620
    )
621

622
    found_paths_and_requests: list[str | PathMetadataRequest] = []
2✔
623
    file_metadata_requests: list[PathMetadataRequest] = []
2✔
624

625
    for metadata_result in metadata_results:
2✔
626
        metadata = metadata_result.metadata
2✔
627
        if not metadata:
2✔
628
            continue
2✔
629

630
        if metadata.kind == PathMetadataKind.DIRECTORY:
2✔
631
            file_metadata_request = PathMetadataRequest(
2✔
632
                path=os.path.join(metadata.path, request.binary_name),
633
                namespace=PathNamespace.SYSTEM,
634
            )
635
            found_paths_and_requests.append(file_metadata_request)
2✔
636
            file_metadata_requests.append(file_metadata_request)
2✔
637

638
        elif metadata.kind == PathMetadataKind.FILE and request.check_file_entries:
×
639
            found_paths_and_requests.append(metadata.path)
×
640

641
    file_metadata_results = await concurrently(
2✔
642
        path_metadata_request(file_metadata_request)
643
        for file_metadata_request in file_metadata_requests
644
    )
645
    file_metadata_results_by_request = dict(zip(file_metadata_requests, file_metadata_results))
2✔
646

647
    found_paths: list[str] = []
2✔
648
    for found_path_or_request in found_paths_and_requests:
2✔
649
        if isinstance(found_path_or_request, str):
2✔
650
            found_paths.append(found_path_or_request)
×
651
        else:
652
            file_metadata_result = file_metadata_results_by_request[found_path_or_request]
2✔
653
            file_metadata = file_metadata_result.metadata
2✔
654
            if not file_metadata:
2✔
655
                continue
2✔
656
            found_paths.append(file_metadata.path)
2✔
657

658
    return tuple(found_paths)
2✔
659

660

661
async def _find_candidate_paths_via_subprocess_helper(
2✔
662
    request: BinaryPathRequest, env_target: EnvironmentTarget
663
) -> tuple[str, ...]:
664
    # If we are not already locating bash, recurse to locate bash to use it as an absolute path in
665
    # our shebang. This avoids mixing locations that we would search for bash into the search paths
666
    # of the request we are servicing.
667
    # TODO(#10769): Replace this script with a statically linked native binary so we don't
668
    #  depend on either /bin/bash being available on the Process host.
669
    if request.binary_name == "bash":
×
670
        shebang = "#!/usr/bin/env bash"
×
671
    else:
672
        bash = await get_bash(**implicitly())
×
673
        shebang = f"#!{bash.path}"
×
674

675
    # HACK: For workspace environments, the `find_binary.sh` will be mounted in the "chroot" directory
676
    # which is not the current directory (since the process will execute in the workspace). Thus,
677
    # adjust the path used as argv[0] to find the script.
678
    script_name = "find_binary.sh"
×
679
    sandbox_base_path = env_target.sandbox_base_path()
×
680
    script_exec_path = (
×
681
        f"./{script_name}" if not sandbox_base_path else f"{sandbox_base_path}/{script_name}"
682
    )
683

684
    script_header = dedent(
×
685
        f"""\
686
        {shebang}
687

688
        set -euox pipefail
689

690
        CHECK_FILE_ENTRIES={"1" if request.check_file_entries else ""}
691
        """
692
    )
693
    script_body = dedent(
×
694
        """\
695
        for path in ${PATH//:/ }; do
696
            if [[ -d "${path}" ]]; then
697
                # Handle traditional directory PATH element.
698
                maybe_exe="${path}/$1"
699
            elif [[ -n "${CHECK_FILE_ENTRIES}" ]]; then
700
                # Handle PATH elements that are filenames to allow for precise selection.
701
                maybe_exe="${path}"
702
            else
703
                maybe_exe=
704
            fi
705
            if [[ "$1" == "${maybe_exe##*/}" && -f "${maybe_exe}" && -x "${maybe_exe}" ]]
706
            then
707
                echo "${maybe_exe}"
708
            fi
709
        done
710
        """
711
    )
712
    script_content = script_header + script_body
×
713
    script_digest = await create_digest(
×
714
        CreateDigest([FileContent(script_name, script_content.encode(), is_executable=True)]),
715
    )
716

717
    # Some subtle notes about executing this script:
718
    #
719
    #  - We run the script with `ProcessResult` instead of `FallibleProcessResult` so that we
720
    #      can catch bugs in the script itself, given an earlier silent failure.
721
    search_path = os.pathsep.join(request.search_path)
×
722
    result = await execute_process_or_raise(
×
723
        **implicitly(
724
            Process(
725
                description=f"Searching for `{request.binary_name}` on PATH={search_path}",
726
                level=LogLevel.DEBUG,
727
                input_digest=script_digest,
728
                argv=[script_exec_path, request.binary_name],
729
                env={"PATH": search_path},
730
                cache_scope=env_target.executable_search_path_cache_scope(),
731
            ),
732
        )
733
    )
734
    return tuple(result.stdout.decode().splitlines())
×
735

736

737
@rule
2✔
738
async def find_binary(
2✔
739
    request: BinaryPathRequest,
740
    env_target: EnvironmentTarget,
741
) -> BinaryPaths:
742
    found_paths: tuple[str, ...]
743
    if env_target.can_access_local_system_paths:
2✔
744
        found_paths = await _find_candidate_paths_via_path_metadata_lookups(request)
2✔
745
    else:
746
        found_paths = await _find_candidate_paths_via_subprocess_helper(request, env_target)
×
747

748
    if not request.test:
2✔
749
        return BinaryPaths(
2✔
750
            binary_name=request.binary_name,
751
            paths=(BinaryPath(path) for path in found_paths),
752
        )
753

754
    results = await concurrently(
2✔
755
        execute_process(
756
            Process(
757
                description=f"Test binary {path}.",
758
                level=LogLevel.DEBUG,
759
                argv=[path, *request.test.args],
760
                # NB: Since a failure is a valid result for this script, we always cache it,
761
                # regardless of success or failure.
762
                cache_scope=env_target.executable_search_path_cache_scope(cache_failures=True),
763
            ),
764
            **implicitly(),
765
        )
766
        for path in found_paths
767
    )
768
    return BinaryPaths(
2✔
769
        binary_name=request.binary_name,
770
        paths=[
771
            (
772
                BinaryPath.fingerprinted(path, result.stdout)
773
                if request.test.fingerprint_stdout
774
                else BinaryPath(path, result.stdout.decode())
775
            )
776
            for path, result in zip(found_paths, results)
777
            if result.exit_code == 0
778
        ],
779
    )
780

781

782
@rule
2✔
783
async def create_binary_shims(
2✔
784
    binary_shims_request: BinaryShimsRequest,
785
    bash: BashBinary,
786
) -> BinaryShims:
787
    """Creates a bin directory with shims for all requested binaries.
788

789
    This can be provided to a `Process` as an `immutable_input_digest`, or can be merged into the
790
    input digest.
791
    """
792

793
    paths = binary_shims_request.paths
2✔
794
    requests = binary_shims_request.requests
2✔
795
    if requests:
2✔
796
        all_binary_paths = await concurrently(
2✔
797
            find_binary(request, **implicitly()) for request in requests
798
        )
799
        first_paths = tuple(
2✔
800
            binary_paths.first_path_or_raise(request, rationale=binary_shims_request.rationale)
801
            for binary_paths, request in zip(all_binary_paths, requests)
802
        )
803
        paths += first_paths
2✔
804

805
    def _create_shim(bash: str, binary: str) -> bytes:
2✔
806
        """The binary shim script to be placed in the output directory for the digest."""
807
        return dedent(
2✔
808
            f"""\
809
            #!{bash}
810
            exec "{binary}" "$@"
811
            """
812
        ).encode()
813

814
    scripts = [
2✔
815
        FileContent(
816
            os.path.basename(path.path), _create_shim(bash.path, path.path), is_executable=True
817
        )
818
        for path in paths
819
    ]
820

821
    digest = await create_digest(CreateDigest(scripts))
2✔
822
    cache_name = f"_binary_shims_{digest.fingerprint}"
2✔
823

824
    return BinaryShims(digest, cache_name)
2✔
825

826

827
@rule(desc="Finding the `realpath` binary", level=LogLevel.DEBUG)
2✔
828
async def find_realpath(
2✔
829
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
830
) -> RealpathBinary:
831
    request = BinaryPathRequest(
2✔
832
        binary_name="realpath", search_path=system_binaries.system_binary_paths
833
    )
834
    paths = await find_binary(request, **implicitly())
2✔
835
    first_path = paths.first_path_or_raise(request, rationale="realpath file")
2✔
836
    return RealpathBinary(first_path.path, first_path.fingerprint)
2✔
837

838

839
@rule(desc="Finding the `awk` binary", level=LogLevel.DEBUG)
2✔
840
async def find_awk(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> AwkBinary:
2✔
841
    request = BinaryPathRequest(binary_name="awk", search_path=system_binaries.system_binary_paths)
2✔
842
    paths = await find_binary(request, **implicitly())
2✔
843
    first_path = paths.first_path_or_raise(request, rationale="awk file")
2✔
844
    return AwkBinary(first_path.path, first_path.fingerprint)
2✔
845

846

847
@rule(desc="Finding the `base64` binary", level=LogLevel.DEBUG)
2✔
848
async def find_base64(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Base64Binary:
2✔
849
    request = BinaryPathRequest(
×
850
        binary_name="base64", search_path=system_binaries.system_binary_paths
851
    )
852
    paths = await find_binary(request, **implicitly())
×
853
    first_path = paths.first_path_or_raise(request, rationale="base64 file")
×
854
    return Base64Binary(first_path.path, first_path.fingerprint)
×
855

856

857
@rule(desc="Finding the `basename` binary", level=LogLevel.DEBUG)
2✔
858
async def find_basename(
2✔
859
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
860
) -> BasenameBinary:
861
    request = BinaryPathRequest(
2✔
862
        binary_name="basename", search_path=system_binaries.system_binary_paths
863
    )
864
    paths = await find_binary(request, **implicitly())
2✔
865
    first_path = paths.first_path_or_raise(request, rationale="basename file")
2✔
866
    return BasenameBinary(first_path.path, first_path.fingerprint)
2✔
867

868

869
@rule(desc="Finding the `bzip2` binary", level=LogLevel.DEBUG)
2✔
870
async def find_bzip2(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Bzip2Binary:
2✔
871
    request = BinaryPathRequest(
×
872
        binary_name="bzip2", search_path=system_binaries.system_binary_paths
873
    )
874
    paths = await find_binary(request, **implicitly())
×
875
    first_path = paths.first_path_or_raise(request, rationale="bzip2 file")
×
876
    return Bzip2Binary(first_path.path, first_path.fingerprint)
×
877

878

879
@rule(desc="Finding the `bzip3` binary", level=LogLevel.DEBUG)
2✔
880
async def find_bzip3(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Bzip3Binary:
2✔
881
    request = BinaryPathRequest(
×
882
        binary_name="bzip3", search_path=system_binaries.system_binary_paths
883
    )
884
    paths = await find_binary(request, **implicitly())
×
885
    first_path = paths.first_path_or_raise(request, rationale="bzip3 file")
×
886
    return Bzip3Binary(first_path.path, first_path.fingerprint)
×
887

888

889
@rule(desc="Finding the `cat` binary", level=LogLevel.DEBUG)
2✔
890
async def find_cat(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CatBinary:
2✔
891
    request = BinaryPathRequest(binary_name="cat", search_path=system_binaries.system_binary_paths)
2✔
892
    paths = await find_binary(request, **implicitly())
2✔
893
    first_path = paths.first_path_or_raise(request, rationale="outputting content from files")
2✔
894
    return CatBinary(first_path.path, first_path.fingerprint)
2✔
895

896

897
@rule(desc="Finding the `chmod` binary", level=LogLevel.DEBUG)
2✔
898
async def find_chmod(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ChmodBinary:
2✔
899
    request = BinaryPathRequest(
2✔
900
        binary_name="chmod", search_path=system_binaries.system_binary_paths
901
    )
902
    paths = await find_binary(request, **implicitly())
2✔
903
    first_path = paths.first_path_or_raise(
2✔
904
        request, rationale="change file modes or Access Control Lists"
905
    )
906
    return ChmodBinary(first_path.path, first_path.fingerprint)
2✔
907

908

909
@rule(desc="Finding the `cksum` binary", level=LogLevel.DEBUG)
2✔
910
async def find_cksum(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CksumBinary:
2✔
911
    request = BinaryPathRequest(
2✔
912
        binary_name="cksum", search_path=system_binaries.system_binary_paths
913
    )
914
    paths = await find_binary(request, **implicitly())
2✔
915
    first_path = paths.first_path_or_raise(request, rationale="cksum file")
2✔
916
    return CksumBinary(first_path.path, first_path.fingerprint)
2✔
917

918

919
@rule(desc="Finding the `cp` binary", level=LogLevel.DEBUG)
2✔
920
async def find_cp(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CpBinary:
2✔
921
    request = BinaryPathRequest(binary_name="cp", search_path=system_binaries.system_binary_paths)
2✔
922
    paths = await find_binary(request, **implicitly())
2✔
923
    first_path = paths.first_path_or_raise(request, rationale="copy files")
2✔
924
    return CpBinary(first_path.path, first_path.fingerprint)
2✔
925

926

927
@rule(desc="Finding the `cut` binary", level=LogLevel.DEBUG)
2✔
928
async def find_cut(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CutBinary:
2✔
929
    request = BinaryPathRequest(binary_name="cut", search_path=system_binaries.system_binary_paths)
2✔
930
    paths = await find_binary(request, **implicitly())
2✔
931
    first_path = paths.first_path_or_raise(request, rationale="cut file")
2✔
932
    return CutBinary(first_path.path, first_path.fingerprint)
2✔
933

934

935
@rule(desc="Finding the `date` binary", level=LogLevel.DEBUG)
2✔
936
async def find_date(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DateBinary:
2✔
937
    request = BinaryPathRequest(binary_name="date", search_path=system_binaries.system_binary_paths)
2✔
938
    paths = await find_binary(request, **implicitly())
2✔
939
    first_path = paths.first_path_or_raise(request, rationale="date file")
2✔
940
    return DateBinary(first_path.path, first_path.fingerprint)
2✔
941

942

943
@rule(desc="Finding the `dd` binary", level=LogLevel.DEBUG)
2✔
944
async def find_dd(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DdBinary:
2✔
945
    request = BinaryPathRequest(binary_name="dd", search_path=system_binaries.system_binary_paths)
2✔
946
    paths = await find_binary(request, **implicitly())
2✔
947
    first_path = paths.first_path_or_raise(request, rationale="dd file")
2✔
948
    return DdBinary(first_path.path, first_path.fingerprint)
2✔
949

950

951
@rule(desc="Finding the `df` binary", level=LogLevel.DEBUG)
2✔
952
async def find_df(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DfBinary:
2✔
953
    request = BinaryPathRequest(binary_name="df", search_path=system_binaries.system_binary_paths)
2✔
954
    paths = await find_binary(request, **implicitly())
2✔
955
    first_path = paths.first_path_or_raise(request, rationale="df file")
2✔
956
    return DfBinary(first_path.path, first_path.fingerprint)
2✔
957

958

959
@rule(desc="Finding the `diff` binary", level=LogLevel.DEBUG)
2✔
960
async def find_diff(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DiffBinary:
2✔
UNCOV
961
    request = BinaryPathRequest(binary_name="diff", search_path=system_binaries.system_binary_paths)
×
UNCOV
962
    paths = await find_binary(request, **implicitly())
×
UNCOV
963
    first_path = paths.first_path_or_raise(request, rationale="compare files line by line")
×
UNCOV
964
    return DiffBinary(first_path.path, first_path.fingerprint)
×
965

966

967
@rule(desc="Finding the `dirname` binary", level=LogLevel.DEBUG)
2✔
968
async def find_dirname(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DirnameBinary:
2✔
969
    request = BinaryPathRequest(
2✔
970
        binary_name="dirname", search_path=system_binaries.system_binary_paths
971
    )
972
    paths = await find_binary(request, **implicitly())
2✔
973
    first_path = paths.first_path_or_raise(request, rationale="dirname file")
2✔
974
    return DirnameBinary(first_path.path, first_path.fingerprint)
2✔
975

976

977
@rule(desc="Finding the `du` binary", level=LogLevel.DEBUG)
2✔
978
async def find_du(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DuBinary:
2✔
979
    request = BinaryPathRequest(binary_name="du", search_path=system_binaries.system_binary_paths)
2✔
980
    paths = await find_binary(request, **implicitly())
2✔
981
    first_path = paths.first_path_or_raise(request, rationale="du file")
2✔
982
    return DuBinary(first_path.path, first_path.fingerprint)
2✔
983

984

985
@rule(desc="Finding the `expr` binary", level=LogLevel.DEBUG)
2✔
986
async def find_expr(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ExprBinary:
2✔
987
    request = BinaryPathRequest(binary_name="expr", search_path=system_binaries.system_binary_paths)
2✔
988
    paths = await find_binary(request, **implicitly())
2✔
989
    first_path = paths.first_path_or_raise(request, rationale="expr file")
2✔
990
    return ExprBinary(first_path.path, first_path.fingerprint)
2✔
991

992

993
@rule(desc="Finding the `find` binary", level=LogLevel.DEBUG)
2✔
994
async def find_find(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> FindBinary:
2✔
995
    request = BinaryPathRequest(binary_name="find", search_path=system_binaries.system_binary_paths)
2✔
996
    paths = await find_binary(request, **implicitly())
2✔
997
    first_path = paths.first_path_or_raise(request, rationale="find file")
2✔
998
    return FindBinary(first_path.path, first_path.fingerprint)
2✔
999

1000

1001
@rule(desc="Finding the `git` binary", level=LogLevel.DEBUG)
2✔
1002
async def find_git(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GitBinary:
2✔
1003
    request = BinaryPathRequest(binary_name="git", search_path=system_binaries.system_binary_paths)
×
1004
    paths = await find_binary(request, **implicitly())
×
1005
    first_path = paths.first_path_or_raise(
×
1006
        request, rationale="track changes to files in your build environment"
1007
    )
1008
    return GitBinary(first_path.path, first_path.fingerprint)
×
1009

1010

1011
@rule(desc="Finding the `getent` binary", level=LogLevel.DEBUG)
2✔
1012
async def find_getent(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GetentBinary:
2✔
1013
    request = BinaryPathRequest(
×
1014
        binary_name="getent", search_path=system_binaries.system_binary_paths
1015
    )
1016
    paths = await find_binary(request, **implicitly())
×
1017
    first_path = paths.first_path_or_raise(request, rationale="getent file")
×
1018
    return GetentBinary(first_path.path, first_path.fingerprint)
×
1019

1020

1021
@rule(desc="Finding the `gpg` binary", level=LogLevel.DEBUG)
2✔
1022
async def find_gpg(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GpgBinary:
2✔
1023
    request = BinaryPathRequest(binary_name="gpg", search_path=system_binaries.system_binary_paths)
×
1024
    paths = await find_binary(request, **implicitly())
×
1025
    first_path = paths.first_path_or_raise(request, rationale="gpg file")
×
1026
    return GpgBinary(first_path.path, first_path.fingerprint)
×
1027

1028

1029
@rule(desc="Finding the `gzip` binary", level=LogLevel.DEBUG)
2✔
1030
async def find_gzip(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GzipBinary:
2✔
1031
    request = BinaryPathRequest(binary_name="gzip", search_path=system_binaries.system_binary_paths)
2✔
1032
    paths = await find_binary(request, **implicitly())
2✔
1033
    first_path = paths.first_path_or_raise(request, rationale="gzip file")
2✔
1034
    return GzipBinary(first_path.path, first_path.fingerprint)
2✔
1035

1036

1037
@rule(desc="Finding the `head` binary", level=LogLevel.DEBUG)
2✔
1038
async def find_head(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> HeadBinary:
2✔
1039
    request = BinaryPathRequest(binary_name="head", search_path=system_binaries.system_binary_paths)
2✔
1040
    paths = await find_binary(request, **implicitly())
2✔
1041
    first_path = paths.first_path_or_raise(request, rationale="head file")
2✔
1042
    return HeadBinary(first_path.path, first_path.fingerprint)
2✔
1043

1044

1045
@rule(desc="Finding the `id` binary", level=LogLevel.DEBUG)
2✔
1046
async def find_id(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> IdBinary:
2✔
1047
    request = BinaryPathRequest(binary_name="id", search_path=system_binaries.system_binary_paths)
2✔
1048
    paths = await find_binary(request, **implicitly())
2✔
1049
    first_path = paths.first_path_or_raise(request, rationale="id file")
2✔
1050
    return IdBinary(first_path.path, first_path.fingerprint)
2✔
1051

1052

1053
@rule(desc="Finding the `ln` binary", level=LogLevel.DEBUG)
2✔
1054
async def find_ln(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> LnBinary:
2✔
1055
    request = BinaryPathRequest(binary_name="ln", search_path=system_binaries.system_binary_paths)
2✔
1056
    paths = await find_binary(request, **implicitly())
2✔
1057
    first_path = paths.first_path_or_raise(request, rationale="link files")
2✔
1058
    return LnBinary(first_path.path, first_path.fingerprint)
2✔
1059

1060

1061
@rule(desc="Finding the `lz4` binary", level=LogLevel.DEBUG)
2✔
1062
async def find_lz4(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Lz4Binary:
2✔
1063
    request = BinaryPathRequest(binary_name="lz4", search_path=system_binaries.system_binary_paths)
×
1064
    paths = await find_binary(request, **implicitly())
×
1065
    first_path = paths.first_path_or_raise(request, rationale="lz4 file")
×
1066
    return Lz4Binary(first_path.path, first_path.fingerprint)
×
1067

1068

1069
@rule(desc="Finding the `lzop` binary", level=LogLevel.DEBUG)
2✔
1070
async def find_lzop(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> LzopBinary:
2✔
1071
    request = BinaryPathRequest(binary_name="lzop", search_path=system_binaries.system_binary_paths)
×
1072
    paths = await find_binary(request, **implicitly())
×
1073
    first_path = paths.first_path_or_raise(request, rationale="lzop file")
×
1074
    return LzopBinary(first_path.path, first_path.fingerprint)
×
1075

1076

1077
@rule(desc="Finding the `md5sum` binary", level=LogLevel.DEBUG)
2✔
1078
async def find_md5sum(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Md5sumBinary:
2✔
1079
    request = BinaryPathRequest(
×
1080
        binary_name="md5sum", search_path=system_binaries.system_binary_paths
1081
    )
1082
    paths = await find_binary(request, **implicitly())
×
1083
    first_path = paths.first_path_or_raise(request, rationale="md5sum file")
×
1084
    return Md5sumBinary(first_path.path, first_path.fingerprint)
×
1085

1086

1087
@rule(desc="Finding the `mkdir` binary", level=LogLevel.DEBUG)
2✔
1088
async def find_mkdir(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> MkdirBinary:
2✔
1089
    request = BinaryPathRequest(
2✔
1090
        binary_name="mkdir", search_path=system_binaries.system_binary_paths
1091
    )
1092
    paths = await find_binary(request, **implicitly())
2✔
1093
    first_path = paths.first_path_or_raise(request, rationale="create directories")
2✔
1094
    return MkdirBinary(first_path.path, first_path.fingerprint)
2✔
1095

1096

1097
@rule(desc="Finding the `mktempt` binary", level=LogLevel.DEBUG)
2✔
1098
async def find_mktemp(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> MktempBinary:
2✔
1099
    request = BinaryPathRequest(
2✔
1100
        binary_name="mktemp", search_path=system_binaries.system_binary_paths
1101
    )
1102
    paths = await find_binary(request, **implicitly())
2✔
1103
    first_path = paths.first_path_or_raise(request, rationale="create temporary files/directories")
2✔
1104
    return MktempBinary(first_path.path, first_path.fingerprint)
2✔
1105

1106

1107
@rule(desc="Finding the `mv` binary", level=LogLevel.DEBUG)
2✔
1108
async def find_mv(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> MvBinary:
2✔
1109
    request = BinaryPathRequest(binary_name="mv", search_path=system_binaries.system_binary_paths)
2✔
1110
    paths = await find_binary(request, **implicitly())
2✔
1111
    first_path = paths.first_path_or_raise(request, rationale="move files")
2✔
1112
    return MvBinary(first_path.path, first_path.fingerprint)
2✔
1113

1114

1115
@rule(desc="Finding the `open` binary", level=LogLevel.DEBUG)
2✔
1116
async def find_open(
2✔
1117
    platform: Platform, system_binaries: SystemBinariesSubsystem.EnvironmentAware
1118
) -> OpenBinary:
1119
    request = BinaryPathRequest(
×
1120
        binary_name=("open" if platform.is_macos else "xdg-open"),
1121
        search_path=system_binaries.system_binary_paths,
1122
    )
1123
    paths = await find_binary(request, **implicitly())
×
1124
    first_path = paths.first_path_or_raise(request, rationale="open URLs with default browser")
×
1125
    return OpenBinary(first_path.path, first_path.fingerprint)
×
1126

1127

1128
@rule(desc="Finding the `pwd` binary", level=LogLevel.DEBUG)
2✔
1129
async def find_pwd(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> PwdBinary:
2✔
1130
    request = BinaryPathRequest(binary_name="pwd", search_path=system_binaries.system_binary_paths)
2✔
1131
    paths = await find_binary(request, **implicitly())
2✔
1132
    first_path = paths.first_path_or_raise(request, rationale="pwd file")
2✔
1133
    return PwdBinary(first_path.path, first_path.fingerprint)
2✔
1134

1135

1136
@rule(desc="Finding the `readlink` binary", level=LogLevel.DEBUG)
2✔
1137
async def find_readlink(
2✔
1138
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
1139
) -> ReadlinkBinary:
1140
    request = BinaryPathRequest(
×
1141
        binary_name="readlink", search_path=system_binaries.system_binary_paths
1142
    )
1143
    paths = await find_binary(request, **implicitly())
×
1144
    first_path = paths.first_path_or_raise(request, rationale="dereference symlinks")
×
1145
    return ReadlinkBinary(first_path.path, first_path.fingerprint)
×
1146

1147

1148
@rule(desc="Finding the `rm` binary", level=LogLevel.DEBUG)
2✔
1149
async def find_rm(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> RmBinary:
2✔
1150
    request = BinaryPathRequest(binary_name="rm", search_path=system_binaries.system_binary_paths)
2✔
1151
    paths = await find_binary(request, **implicitly())
2✔
1152
    first_path = paths.first_path_or_raise(request, rationale="rm file")
2✔
1153
    return RmBinary(first_path.path, first_path.fingerprint)
2✔
1154

1155

1156
@rule(desc="Finding the `sed` binary", level=LogLevel.DEBUG)
2✔
1157
async def find_sed(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> SedBinary:
2✔
1158
    request = BinaryPathRequest(binary_name="sed", search_path=system_binaries.system_binary_paths)
2✔
1159
    paths = await find_binary(request, **implicitly())
2✔
1160
    first_path = paths.first_path_or_raise(request, rationale="sed file")
2✔
1161
    return SedBinary(first_path.path, first_path.fingerprint)
2✔
1162

1163

1164
@rule(desc="Finding the `sh` binary", level=LogLevel.DEBUG)
2✔
1165
async def find_sh(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ShBinary:
2✔
1166
    request = BinaryPathRequest(binary_name="sh", search_path=system_binaries.system_binary_paths)
2✔
1167
    paths = await find_binary(request, **implicitly())
2✔
1168
    first_path = paths.first_path_or_raise(request, rationale="sh file")
2✔
1169
    return ShBinary(first_path.path, first_path.fingerprint)
2✔
1170

1171

1172
@rule(desc="Finding the `shasum` binary", level=LogLevel.DEBUG)
2✔
1173
async def find_shasum(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ShasumBinary:
2✔
1174
    request = BinaryPathRequest(
×
1175
        binary_name="shasum", search_path=system_binaries.system_binary_paths
1176
    )
1177
    paths = await find_binary(request, **implicitly())
×
1178
    first_path = paths.first_path_or_raise(request, rationale="shasum file")
×
1179
    return ShasumBinary(first_path.path, first_path.fingerprint)
×
1180

1181

1182
@rule(desc="Finding the `sort` binary", level=LogLevel.DEBUG)
2✔
1183
async def find_sort(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> SortBinary:
2✔
1184
    request = BinaryPathRequest(binary_name="sort", search_path=system_binaries.system_binary_paths)
2✔
1185
    paths = await find_binary(request, **implicitly())
2✔
1186
    first_path = paths.first_path_or_raise(request, rationale="sort file")
2✔
1187
    return SortBinary(first_path.path, first_path.fingerprint)
2✔
1188

1189

1190
@rule(desc="Finding the `tail` binary", level=LogLevel.DEBUG)
2✔
1191
async def find_tail(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TailBinary:
2✔
1192
    request = BinaryPathRequest(binary_name="tail", search_path=system_binaries.system_binary_paths)
2✔
1193
    paths = await find_binary(request, **implicitly())
2✔
1194
    first_path = paths.first_path_or_raise(request, rationale="tail file")
2✔
1195
    return TailBinary(first_path.path, first_path.fingerprint)
2✔
1196

1197

1198
@rule(desc="Finding the `tar` binary", level=LogLevel.DEBUG)
2✔
1199
async def find_tar(
2✔
1200
    platform: Platform, system_binaries: SystemBinariesSubsystem.EnvironmentAware
1201
) -> TarBinary:
1202
    request = BinaryPathRequest(
2✔
1203
        binary_name="tar",
1204
        search_path=system_binaries.system_binary_paths,
1205
        test=BinaryPathTest(args=["--version"]),
1206
    )
1207
    paths = await find_binary(request, **implicitly())
2✔
1208
    first_path = paths.first_path_or_raise(
2✔
1209
        request, rationale="download the tools Pants needs to run"
1210
    )
1211
    return TarBinary(first_path.path, first_path.fingerprint, platform)
2✔
1212

1213

1214
@rule(desc="Finding the `test` binary", level=LogLevel.DEBUG)
2✔
1215
async def find_test(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TestBinary:
2✔
1216
    request = BinaryPathRequest(binary_name="test", search_path=system_binaries.system_binary_paths)
2✔
1217
    paths = await find_binary(request, **implicitly())
2✔
1218
    first_path = paths.first_path_or_raise(request, rationale="test file")
2✔
1219
    return TestBinary(first_path.path, first_path.fingerprint)
2✔
1220

1221

1222
@rule(desc="Finding the `touch` binary", level=LogLevel.DEBUG)
2✔
1223
async def find_touch(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TouchBinary:
2✔
UNCOV
1224
    request = BinaryPathRequest(
×
1225
        binary_name="touch", search_path=system_binaries.system_binary_paths
1226
    )
UNCOV
1227
    paths = await find_binary(request, **implicitly())
×
UNCOV
1228
    first_path = paths.first_path_or_raise(request, rationale="touch file")
×
UNCOV
1229
    return TouchBinary(first_path.path, first_path.fingerprint)
×
1230

1231

1232
@rule(desc="Finding the `tr` binary", level=LogLevel.DEBUG)
2✔
1233
async def find_tr(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TrBinary:
2✔
1234
    request = BinaryPathRequest(binary_name="tr", search_path=system_binaries.system_binary_paths)
2✔
1235
    paths = await find_binary(request, **implicitly())
2✔
1236
    first_path = paths.first_path_or_raise(request, rationale="tr file")
2✔
1237
    return TrBinary(first_path.path, first_path.fingerprint)
2✔
1238

1239

1240
@rule(desc="Finding the `unzip` binary", level=LogLevel.DEBUG)
2✔
1241
async def find_unzip(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> UnzipBinary:
2✔
1242
    request = BinaryPathRequest(
2✔
1243
        binary_name="unzip",
1244
        search_path=system_binaries.system_binary_paths,
1245
        test=BinaryPathTest(args=["-v"]),
1246
    )
1247
    paths = await find_binary(request, **implicitly())
2✔
1248
    first_path = paths.first_path_or_raise(
2✔
1249
        request, rationale="download the tools Pants needs to run"
1250
    )
1251
    return UnzipBinary(first_path.path, first_path.fingerprint)
2✔
1252

1253

1254
@rule(desc="Finding the `wc` binary", level=LogLevel.DEBUG)
2✔
1255
async def find_wc(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> WcBinary:
2✔
1256
    request = BinaryPathRequest(binary_name="wc", search_path=system_binaries.system_binary_paths)
2✔
1257
    paths = await find_binary(request, **implicitly())
2✔
1258
    first_path = paths.first_path_or_raise(request, rationale="wc file")
2✔
1259
    return WcBinary(first_path.path, first_path.fingerprint)
2✔
1260

1261

1262
@rule(desc="Finding the `xargs` binary", level=LogLevel.DEBUG)
2✔
1263
async def find_xargs(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> XargsBinary:
2✔
1264
    request = BinaryPathRequest(
2✔
1265
        binary_name="xargs", search_path=system_binaries.system_binary_paths
1266
    )
1267
    paths = await find_binary(request, **implicitly())
2✔
1268
    first_path = paths.first_path_or_raise(request, rationale="xargs file")
2✔
1269
    return XargsBinary(first_path.path, first_path.fingerprint)
2✔
1270

1271

1272
@rule(desc="Finding the `xz` binary", level=LogLevel.DEBUG)
2✔
1273
async def find_xz(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> XzBinary:
2✔
1274
    request = BinaryPathRequest(binary_name="xz", search_path=system_binaries.system_binary_paths)
×
1275
    paths = await find_binary(request, **implicitly())
×
1276
    first_path = paths.first_path_or_raise(request, rationale="xz file")
×
1277
    return XzBinary(first_path.path, first_path.fingerprint)
×
1278

1279

1280
@rule(desc="Finding the `zip` binary", level=LogLevel.DEBUG)
2✔
1281
async def find_zip(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ZipBinary:
2✔
UNCOV
1282
    request = BinaryPathRequest(
×
1283
        binary_name="zip",
1284
        search_path=system_binaries.system_binary_paths,
1285
        test=BinaryPathTest(args=["-v"]),
1286
    )
UNCOV
1287
    paths = await find_binary(request, **implicitly())
×
UNCOV
1288
    first_path = paths.first_path_or_raise(request, rationale="create `.zip` archives")
×
UNCOV
1289
    return ZipBinary(first_path.path, first_path.fingerprint)
×
1290

1291

1292
@rule(desc="Finding the `zstd` binary", level=LogLevel.DEBUG)
2✔
1293
async def find_zstd(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ZstdBinary:
2✔
1294
    request = BinaryPathRequest(binary_name="zstd", search_path=system_binaries.system_binary_paths)
×
1295
    paths = await find_binary(request, **implicitly())
×
1296
    first_path = paths.first_path_or_raise(request, rationale="zstd file")
×
1297
    return ZstdBinary(first_path.path, first_path.fingerprint)
×
1298

1299

1300
def rules():
2✔
1301
    return [*collect_rules(), *python_bootstrap.rules()]
2✔
1302

1303

1304
# -------------------------------------------------------------------------------------------
1305
# Rules for fallible binaries
1306
# -------------------------------------------------------------------------------------------
1307

1308

1309
@dataclass(frozen=True)
2✔
1310
class MaybeGitBinary:
2✔
1311
    git_binary: GitBinary | None = None
2✔
1312

1313

1314
@rule(desc="Finding the `git` binary", level=LogLevel.DEBUG)
2✔
1315
async def maybe_find_git(
2✔
1316
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
1317
) -> MaybeGitBinary:
1318
    request = BinaryPathRequest(binary_name="git", search_path=system_binaries.system_binary_paths)
×
1319
    paths = await find_binary(request, **implicitly())
×
1320
    first_path = paths.first_path
×
1321
    if not first_path:
×
1322
        return MaybeGitBinary()
×
1323

1324
    return MaybeGitBinary(GitBinary(first_path.path, first_path.fingerprint))
×
1325

1326

1327
class MaybeGitBinaryRequest:
2✔
1328
    pass
2✔
1329

1330

1331
@rule
2✔
1332
async def maybe_find_git_wrapper(
2✔
1333
    _: MaybeGitBinaryRequest, maybe_git_binary: MaybeGitBinary
1334
) -> MaybeGitBinary:
1335
    return maybe_git_binary
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc