• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25443604553

06 May 2026 03:05PM UTC coverage: 92.879% (-0.04%) from 92.915%
25443604553

push

github

web-flow
[pants_ng] Scaffolding for a pants_ng mode. (#23319)

In this mode the command line is parsed as an
NG invocation, and dispatched appropriately.

Of course at the moment there are no
implementations to dispatch to. That will follow.

This does expose a new option, `pants_ng` to users. 
There is a big warning not to set it, but we're not trying
to hide that we're working on a new thing, so I am
comfortable with this.

25 of 76 new or added lines in 9 files covered. (32.89%)

1294 existing lines in 76 files now uncovered.

92234 of 99306 relevant lines covered (92.88%)

4.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.97
/src/python/pants/core/util_rules/system_binaries.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import dataclasses
12✔
7
import hashlib
12✔
8
import logging
12✔
9
import os
12✔
10
import shlex
12✔
11
import subprocess
12✔
12
from collections.abc import Iterable, Mapping, Sequence
12✔
13
from dataclasses import dataclass
12✔
14
from enum import Enum
12✔
15
from itertools import groupby
12✔
16
from textwrap import dedent  # noqa: PNT20
12✔
17
from typing import Self
12✔
18

19
from pants.core.environments.target_types import EnvironmentTarget
12✔
20
from pants.core.subsystems import python_bootstrap
12✔
21
from pants.engine.collection import DeduplicatedCollection
12✔
22
from pants.engine.engine_aware import EngineAwareReturnType
12✔
23
from pants.engine.fs import CreateDigest, FileContent, PathMetadataRequest
12✔
24
from pants.engine.internals.native_engine import Digest, PathMetadataKind, PathNamespace
12✔
25
from pants.engine.internals.selectors import concurrently
12✔
26
from pants.engine.intrinsics import create_digest, execute_process, path_metadata_request
12✔
27
from pants.engine.platform import Platform
12✔
28
from pants.engine.process import Process, execute_process_or_raise
12✔
29
from pants.engine.rules import collect_rules, implicitly, rule
12✔
30
from pants.option.option_types import StrListOption
12✔
31
from pants.option.subsystem import Subsystem
12✔
32
from pants.util.frozendict import FrozenDict
12✔
33
from pants.util.logging import LogLevel
12✔
34
from pants.util.memo import memoized_property
12✔
35
from pants.util.ordered_set import OrderedSet
12✔
36
from pants.util.strutil import pluralize, softwrap
12✔
37

38
logger = logging.getLogger(__name__)
12✔
39

40

41
class SystemBinariesSubsystem(Subsystem):
12✔
42
    options_scope = "system-binaries"
12✔
43
    help = "System binaries related settings."
12✔
44

45
    class EnvironmentAware(Subsystem.EnvironmentAware):
12✔
46
        env_vars_used_by_options = ("PATH",)
12✔
47

48
        _DEFAULT_SEARCH_PATHS = ("/usr/bin", "/bin", "/usr/local/bin", "/opt/homebrew/bin")
12✔
49

50
        _system_binary_paths = StrListOption(
12✔
51
            default=[*_DEFAULT_SEARCH_PATHS],
52
            help=softwrap(
53
                """
54
                The PATH value that will searched for executables.
55

56
                The special string `"<PATH>"` will expand to the contents of the PATH env var.
57
                """
58
            ),
59
        )
60

61
        @memoized_property
12✔
62
        def system_binary_paths(self) -> SearchPath:
12✔
63
            def iter_path_entries():
12✔
64
                for entry in self._system_binary_paths:
12✔
65
                    if entry == "<PATH>":
12✔
66
                        path = self._options_env.get("PATH")
×
67
                        if path:
×
68
                            for prefix in path.split(os.pathsep):
×
69
                                if prefix.startswith("$") or prefix.startswith("~"):
×
70
                                    logger.warning(
×
71
                                        f"Ignored unexpanded path prefix `{prefix}` in the `PATH` environment variable while processing the `<PATH>` marker from the `[system-binaries].system_binary_paths` option. Please check the value of the `PATH` environment variable in your shell."
72
                                    )
73
                                else:
74
                                    yield prefix
×
75
                    else:
76
                        yield entry
12✔
77

78
            return SearchPath(iter_path_entries())
12✔
79

80

81
# -------------------------------------------------------------------------------------------
82
# `BinaryPath` types
83
# -------------------------------------------------------------------------------------------
84

85

86
@dataclass(frozen=True)
12✔
87
class BinaryPath:
12✔
88
    path: str
12✔
89
    fingerprint: str
12✔
90

91
    def __init__(self, path: str, fingerprint: str | None = None) -> None:
12✔
92
        object.__setattr__(self, "path", path)
12✔
93
        object.__setattr__(
12✔
94
            self, "fingerprint", self._fingerprint() if fingerprint is None else fingerprint
95
        )
96

97
    @staticmethod
12✔
98
    def _fingerprint(content: bytes | bytearray | memoryview | None = None) -> str:
12✔
99
        hasher = hashlib.sha256() if content is None else hashlib.sha256(content)
12✔
100
        return hasher.hexdigest()
12✔
101

102
    @classmethod
12✔
103
    def fingerprinted(
12✔
104
        cls, path: str, representative_content: bytes | bytearray | memoryview
105
    ) -> Self:
106
        return cls(path, fingerprint=cls._fingerprint(representative_content))
12✔
107

108

109
@dataclass(unsafe_hash=True)
12✔
110
class BinaryPathTest:
12✔
111
    args: tuple[str, ...]
12✔
112
    fingerprint_stdout: bool
12✔
113

114
    def __init__(self, args: Iterable[str], fingerprint_stdout: bool = True) -> None:
12✔
115
        object.__setattr__(self, "args", tuple(args))
12✔
116
        object.__setattr__(self, "fingerprint_stdout", fingerprint_stdout)
12✔
117

118

119
class SearchPath(DeduplicatedCollection[str]):
12✔
120
    """The search path for binaries; i.e.: the $PATH."""
121

122

123
@dataclass(unsafe_hash=True)
12✔
124
class BinaryPathRequest:
12✔
125
    """Request to find a binary of a given name.
126

127
    If `check_file_entries` is `True` a BinaryPathRequest will consider any entries in the
128
    `search_path` that are file paths in addition to traditional directory paths.
129

130
    If a `test` is specified all binaries that are found will be executed with the test args and
131
    only those binaries whose test executions exit with return code 0 will be retained.
132
    Additionally, if test execution includes stdout content, that will be used to fingerprint the
133
    binary path so that upgrades and downgrades can be detected. A reasonable test for many programs
134
    might be `BinaryPathTest(args=["--version"])` since it will both ensure the program runs and
135
    also produce stdout text that changes upon upgrade or downgrade of the binary at the discovered
136
    path.
137
    """
138

139
    search_path: SearchPath
12✔
140
    binary_name: str
12✔
141
    check_file_entries: bool
12✔
142
    test: BinaryPathTest | None
12✔
143

144
    def __init__(
12✔
145
        self,
146
        *,
147
        search_path: Iterable[str],
148
        binary_name: str,
149
        check_file_entries: bool = False,
150
        test: BinaryPathTest | None = None,
151
    ) -> None:
152
        object.__setattr__(self, "search_path", SearchPath(search_path))
12✔
153
        object.__setattr__(self, "binary_name", binary_name)
12✔
154
        object.__setattr__(self, "check_file_entries", check_file_entries)
12✔
155
        object.__setattr__(self, "test", test)
12✔
156

157

158
@dataclass(frozen=True)
12✔
159
class BinaryPaths(EngineAwareReturnType):
12✔
160
    binary_name: str
12✔
161
    paths: tuple[BinaryPath, ...]
12✔
162

163
    def __init__(self, binary_name: str, paths: Iterable[BinaryPath] | None = None):
12✔
164
        object.__setattr__(self, "binary_name", binary_name)
12✔
165
        object.__setattr__(self, "paths", tuple(OrderedSet(paths) if paths else ()))
12✔
166

167
    def message(self) -> str:
12✔
168
        if not self.paths:
×
169
            return f"failed to find {self.binary_name}"
×
170
        found_msg = f"found {self.binary_name} at {self.paths[0]}"
×
171
        if len(self.paths) > 1:
×
172
            found_msg = f"{found_msg} and {pluralize(len(self.paths) - 1, 'other location')}"
×
173
        return found_msg
×
174

175
    @property
12✔
176
    def first_path(self) -> BinaryPath | None:
12✔
177
        """Return the first path to the binary that was discovered, if any."""
178
        return next(iter(self.paths), None)
12✔
179

180
    def first_path_or_raise(self, request: BinaryPathRequest, *, rationale: str) -> BinaryPath:
12✔
181
        """Return the first path to the binary that was discovered, if any."""
182
        first_path = self.first_path
12✔
183
        if not first_path:
12✔
184
            raise BinaryNotFoundError.from_request(request, rationale=rationale)
3✔
185
        return first_path
12✔
186

187

188
class BinaryNotFoundError(EnvironmentError):
12✔
189
    @classmethod
12✔
190
    def from_request(
12✔
191
        cls,
192
        request: BinaryPathRequest,
193
        *,
194
        rationale: str | None = None,
195
        alternative_solution: str | None = None,
196
    ) -> BinaryNotFoundError:
197
        """When no binary is found via `BinaryPaths`, and it is not recoverable.
198

199
        :param rationale: A short description of why this binary is needed, e.g.
200
            "download the tools Pants needs" or "run Python programs".
201
        :param alternative_solution: A description of what else users can do to fix the issue,
202
            beyond installing the program. For example, "Alternatively, you can set the option
203
            `--python-bootstrap-search-path` to change the paths searched."
204
        """
205
        msg = softwrap(
3✔
206
            f"""
207
            Cannot find `{request.binary_name}` on `{sorted(request.search_path)}`.
208
            Please ensure that it is installed
209
            """
210
        )
211
        msg += f" so that Pants can {rationale}." if rationale else "."
3✔
212
        if alternative_solution:
3✔
213
            msg += f"\n\n{alternative_solution}"
×
214
        return BinaryNotFoundError(msg)
3✔
215

216

217
# -------------------------------------------------------------------------------------------
218
# Binary shims
219
# Creates a Digest with a shim for each requested binary in a directory suitable for PATH.
220
# -------------------------------------------------------------------------------------------
221

222

223
@dataclass(frozen=True)
12✔
224
class BinaryShimsRequest:
12✔
225
    """Request to create shims for one or more system binaries."""
226

227
    rationale: str = dataclasses.field(compare=False)
12✔
228

229
    # Create shims for provided binary paths
230
    paths: tuple[BinaryPath, ...] = tuple()
12✔
231

232
    # Create shims for the provided binary names after looking up the paths.
233
    requests: tuple[BinaryPathRequest, ...] = tuple()
12✔
234

235
    @classmethod
12✔
236
    def for_binaries(
12✔
237
        cls, *names: str, rationale: str, search_path: Sequence[str]
238
    ) -> BinaryShimsRequest:
239
        return cls(
6✔
240
            requests=tuple(
241
                BinaryPathRequest(binary_name=binary_name, search_path=search_path)
242
                for binary_name in names
243
            ),
244
            rationale=rationale,
245
        )
246

247
    @classmethod
12✔
248
    def for_paths(
12✔
249
        cls,
250
        *paths: BinaryPath,
251
        rationale: str,
252
    ) -> BinaryShimsRequest:
253
        # Remove any duplicates (which may result if the caller merges `BinaryPath` instances from multiple sources)
254
        # and also sort to ensure a stable order for better caching.
255
        paths = tuple(sorted(set(paths), key=lambda bp: bp.path))
11✔
256

257
        # Then ensure that there are no duplicate paths with mismatched content.
258
        duplicate_paths = set()
11✔
259
        for path, group in groupby(paths, key=lambda x: x.path):
11✔
260
            if len(list(group)) > 1:
11✔
261
                duplicate_paths.add(path)
1✔
262
        if duplicate_paths:
11✔
263
            raise ValueError(
1✔
264
                "Detected duplicate paths with mismatched content at paths: "
265
                f"{', '.join(sorted(duplicate_paths))}"
266
            )
267

268
        return cls(
11✔
269
            paths=paths,
270
            rationale=rationale,
271
        )
272

273

274
@dataclass(frozen=True)
12✔
275
class BinaryShims:
12✔
276
    """The shims created for a BinaryShimsRequest are placed in `bin_directory` of the `digest`.
277

278
    The purpose of these shims is so that a Process may be executed with `immutable_input_digests`
279
    provided to the `Process`, and `path_component` included in its `PATH` environment variable.
280

281
    The alternative is to add the directories hosting the binaries directly, but that opens up for
282
    many more unrelated binaries to also be executable from PATH, leaking into the sandbox
283
    unnecessarily.
284
    """
285

286
    digest: Digest
12✔
287
    cache_name: str
12✔
288

289
    @property
12✔
290
    def immutable_input_digests(self) -> Mapping[str, Digest]:
12✔
291
        return FrozenDict({self.cache_name: self.digest})
11✔
292

293
    @property
12✔
294
    def path_component(self) -> str:
12✔
295
        return os.path.join("{chroot}", self.cache_name)
11✔
296

297

298
# -------------------------------------------------------------------------------------------
299
# Binaries
300
# -------------------------------------------------------------------------------------------
301

302

303
class BashBinary(BinaryPath):
12✔
304
    """The `bash` binary."""
305

306
    DEFAULT_SEARCH_PATH = SearchPath(("/usr/bin", "/bin", "/usr/local/bin"))
12✔
307

308

309
class RealpathBinary(BinaryPath):
12✔
310
    pass
12✔
311

312

313
# Note that updating this will impact the `archive` target defined in `core/target_types.py`.
314
class ArchiveFormat(Enum):
12✔
315
    TAR = "tar"
12✔
316
    TGZ = "tar.gz"
12✔
317
    TBZ2 = "tar.bz2"
12✔
318
    TXZ = "tar.xz"
12✔
319
    ZIP = "zip"
12✔
320

321

322
class ZipBinary(BinaryPath):
12✔
323
    pass
12✔
324

325

326
class UnzipBinary(BinaryPath):
12✔
327
    def extract_archive_argv(self, archive_path: str, extract_path: str) -> tuple[str, ...]:
12✔
328
        # Note that the `output_dir` does not need to already exist.
329
        # The caller should validate that it's a valid `.zip` file.
330
        return (self.path, archive_path, "-d", extract_path)
12✔
331

332

333
@dataclass(frozen=True)
12✔
334
class TarBinary(BinaryPath):
12✔
335
    platform: Platform
12✔
336

337
    def create_archive_argv(
12✔
338
        self,
339
        output_filename: str,
340
        tar_format: ArchiveFormat,
341
        *,
342
        input_files: Sequence[str] = (),
343
        input_file_list_filename: str | None = None,
344
    ) -> tuple[str, ...]:
345
        # Note that the parent directory for the output_filename must already exist.
346
        #
347
        # We do not use `-a` (auto-set compression) because it does not work with older tar
348
        # versions. Not all tar implementations will support these compression formats - in that
349
        # case, the user will need to choose a different format.
350
        compression = {ArchiveFormat.TGZ: "z", ArchiveFormat.TBZ2: "j", ArchiveFormat.TXZ: "J"}.get(
3✔
351
            tar_format, ""
352
        )
353

354
        files_from = ("--files-from", input_file_list_filename) if input_file_list_filename else ()
3✔
355
        return (self.path, f"c{compression}f", output_filename, *input_files) + files_from
3✔
356

357
    def extract_archive_argv(
12✔
358
        self, archive_path: str, extract_path: str, *, archive_suffix: str
359
    ) -> tuple[str, ...]:
360
        # Note that the `output_dir` must already exist.
361
        # The caller should validate that it's a valid `.tar` file.
362
        prog_args = (
12✔
363
            ("-Ilz4",) if archive_suffix == ".tar.lz4" and not self.platform.is_macos else ()
364
        )
365
        return (self.path, *prog_args, "-xf", archive_path, "-C", extract_path)
12✔
366

367

368
class AwkBinary(BinaryPath):
12✔
369
    pass
12✔
370

371

372
class Base64Binary(BinaryPath):
12✔
373
    pass
12✔
374

375

376
class BasenameBinary(BinaryPath):
12✔
377
    pass
12✔
378

379

380
class Bzip2Binary(BinaryPath):
12✔
381
    pass
12✔
382

383

384
class Bzip3Binary(BinaryPath):
12✔
385
    pass
12✔
386

387

388
class CatBinary(BinaryPath):
12✔
389
    pass
12✔
390

391

392
class ChmodBinary(BinaryPath):
12✔
393
    pass
12✔
394

395

396
class CksumBinary(BinaryPath):
12✔
397
    pass
12✔
398

399

400
class CpBinary(BinaryPath):
12✔
401
    pass
12✔
402

403

404
class CutBinary(BinaryPath):
12✔
405
    pass
12✔
406

407

408
class DateBinary(BinaryPath):
12✔
409
    pass
12✔
410

411

412
class DdBinary(BinaryPath):
12✔
413
    pass
12✔
414

415

416
class DfBinary(BinaryPath):
12✔
417
    pass
12✔
418

419

420
class DiffBinary(BinaryPath):
12✔
421
    pass
12✔
422

423

424
class DirnameBinary(BinaryPath):
12✔
425
    pass
12✔
426

427

428
class DuBinary(BinaryPath):
12✔
429
    pass
12✔
430

431

432
class ExprBinary(BinaryPath):
12✔
433
    pass
12✔
434

435

436
class FindBinary(BinaryPath):
12✔
437
    pass
12✔
438

439

440
class GetentBinary(BinaryPath):
12✔
441
    pass
12✔
442

443

444
class GpgBinary(BinaryPath):
12✔
445
    pass
12✔
446

447

448
class GzipBinary(BinaryPath):
12✔
449
    pass
12✔
450

451

452
class HeadBinary(BinaryPath):
12✔
453
    pass
12✔
454

455

456
class IdBinary(BinaryPath):
12✔
457
    pass
12✔
458

459

460
class LnBinary(BinaryPath):
12✔
461
    pass
12✔
462

463

464
class Lz4Binary(BinaryPath):
12✔
465
    pass
12✔
466

467

468
class LzopBinary(BinaryPath):
12✔
469
    pass
12✔
470

471

472
class Md5sumBinary(BinaryPath):
12✔
473
    pass
12✔
474

475

476
class MkdirBinary(BinaryPath):
12✔
477
    pass
12✔
478

479

480
class MktempBinary(BinaryPath):
12✔
481
    pass
12✔
482

483

484
class MvBinary(BinaryPath):
12✔
485
    pass
12✔
486

487

488
class OpenBinary(BinaryPath):
12✔
489
    pass
12✔
490

491

492
class PwdBinary(BinaryPath):
12✔
493
    pass
12✔
494

495

496
class ReadlinkBinary(BinaryPath):
12✔
497
    pass
12✔
498

499

500
class RmBinary(BinaryPath):
12✔
501
    pass
12✔
502

503

504
class SedBinary(BinaryPath):
12✔
505
    pass
12✔
506

507

508
class ShBinary(BinaryPath):
12✔
509
    pass
12✔
510

511

512
class ShasumBinary(BinaryPath):
12✔
513
    pass
12✔
514

515

516
class SortBinary(BinaryPath):
12✔
517
    pass
12✔
518

519

520
class TailBinary(BinaryPath):
12✔
521
    pass
12✔
522

523

524
class TestBinary(BinaryPath):
12✔
525
    pass
12✔
526

527

528
class TouchBinary(BinaryPath):
12✔
529
    pass
12✔
530

531

532
class TrBinary(BinaryPath):
12✔
533
    pass
12✔
534

535

536
class WcBinary(BinaryPath):
12✔
537
    pass
12✔
538

539

540
class XargsBinary(BinaryPath):
12✔
541
    pass
12✔
542

543

544
class XzBinary(BinaryPath):
12✔
545
    pass
12✔
546

547

548
class ZstdBinary(BinaryPath):
12✔
549
    pass
12✔
550

551

552
class GitBinaryException(Exception):
12✔
553
    pass
12✔
554

555

556
class GitBinary(BinaryPath):
12✔
557
    def _invoke_unsandboxed(self, cmd: list[str]) -> bytes:
12✔
558
        """Invoke the given git command, _without_ the sandboxing provided by the `Process` API.
559

560
        This API is for internal use only: users should prefer to consume methods of the
561
        `GitWorktree` class.
562
        """
563
        cmd = [self.path, *cmd]
2✔
564

565
        self._log_call(cmd)
2✔
566

567
        try:
2✔
568
            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2✔
569
        except OSError as e:
1✔
570
            # Binary DNE or is not executable
571
            cmd_str = shlex.join(cmd)
1✔
572
            raise GitBinaryException(f"Failed to execute command {cmd_str}: {e!r}")
1✔
573
        out, err = process.communicate()
2✔
574

575
        self._check_result(cmd, process.returncode, err.decode())
2✔
576

577
        return out.strip()
2✔
578

579
    def _check_result(self, cmd: list[str], result: int, failure_msg: str | None = None) -> None:
12✔
580
        # git diff --exit-code exits with 1 if there were differences.
581
        if result != 0 and (result != 1 or "diff" not in cmd):
2✔
582
            cmd_str = shlex.join(cmd)
1✔
583
            raise GitBinaryException(failure_msg or f"{cmd_str} failed with exit code {result}")
1✔
584

585
    def _log_call(self, cmd: Iterable[str]) -> None:
12✔
586
        logger.debug("Executing: " + " ".join(cmd))
2✔
587

588

589
# -------------------------------------------------------------------------------------------
590
# Binaries Rules
591
# -------------------------------------------------------------------------------------------
592

593

594
@rule(desc="Finding the `bash` binary", level=LogLevel.DEBUG)
12✔
595
async def get_bash(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> BashBinary:
12✔
596
    search_path = system_binaries.system_binary_paths
12✔
597

598
    request = BinaryPathRequest(
12✔
599
        binary_name="bash",
600
        search_path=search_path,
601
        test=BinaryPathTest(args=["--version"]),
602
    )
603
    paths = await find_binary(request, **implicitly())
12✔
604
    first_path = paths.first_path
12✔
605
    if not first_path:
12✔
606
        raise BinaryNotFoundError.from_request(request)
×
607
    return BashBinary(first_path.path, first_path.fingerprint)
12✔
608

609

610
async def _find_candidate_paths_via_path_metadata_lookups(
12✔
611
    request: BinaryPathRequest,
612
) -> tuple[str, ...]:
613
    search_path = [os.path.abspath(path) for path in request.search_path]
12✔
614

615
    metadata_results = await concurrently(
12✔
616
        path_metadata_request(PathMetadataRequest(path=path, namespace=PathNamespace.SYSTEM))
617
        for path in search_path
618
    )
619

620
    found_paths_and_requests: list[str | PathMetadataRequest] = []
12✔
621
    file_metadata_requests: list[PathMetadataRequest] = []
12✔
622

623
    for metadata_result in metadata_results:
12✔
624
        metadata = metadata_result.metadata
12✔
625
        if not metadata:
12✔
626
            continue
12✔
627

628
        if metadata.kind in (PathMetadataKind.DIRECTORY, PathMetadataKind.SYMLINK):
12✔
629
            file_metadata_request = PathMetadataRequest(
12✔
630
                path=os.path.join(metadata.path, request.binary_name),
631
                namespace=PathNamespace.SYSTEM,
632
            )
633
            found_paths_and_requests.append(file_metadata_request)
12✔
634
            file_metadata_requests.append(file_metadata_request)
12✔
635

636
        elif metadata.kind == PathMetadataKind.FILE and request.check_file_entries:
1✔
637
            found_paths_and_requests.append(metadata.path)
1✔
638

639
    file_metadata_results = await concurrently(
12✔
640
        path_metadata_request(file_metadata_request)
641
        for file_metadata_request in file_metadata_requests
642
    )
643
    file_metadata_results_by_request = dict(zip(file_metadata_requests, file_metadata_results))
12✔
644

645
    found_paths: list[str] = []
12✔
646
    for found_path_or_request in found_paths_and_requests:
12✔
647
        if isinstance(found_path_or_request, str):
12✔
648
            found_paths.append(found_path_or_request)
1✔
649
        else:
650
            file_metadata_result = file_metadata_results_by_request[found_path_or_request]
12✔
651
            file_metadata = file_metadata_result.metadata
12✔
652
            if not file_metadata:
12✔
653
                continue
12✔
654
            found_paths.append(file_metadata.path)
12✔
655

656
    return tuple(found_paths)
12✔
657

658

659
async def _find_candidate_paths_via_subprocess_helper(
12✔
660
    request: BinaryPathRequest, env_target: EnvironmentTarget
661
) -> tuple[str, ...]:
662
    # If we are not already locating bash, recurse to locate bash to use it as an absolute path in
663
    # our shebang. This avoids mixing locations that we would search for bash into the search paths
664
    # of the request we are servicing.
665
    # TODO(#10769): Replace this script with a statically linked native binary so we don't
666
    #  depend on either /bin/bash being available on the Process host.
667
    if request.binary_name == "bash":
1✔
668
        shebang = "#!/usr/bin/env bash"
1✔
669
    else:
670
        bash = await get_bash(**implicitly())
1✔
671
        shebang = f"#!{bash.path}"
1✔
672

673
    # HACK: For workspace environments, the `find_binary.sh` will be mounted in the "chroot" directory
674
    # which is not the current directory (since the process will execute in the workspace). Thus,
675
    # adjust the path used as argv[0] to find the script.
676
    script_name = "find_binary.sh"
1✔
677
    sandbox_base_path = env_target.sandbox_base_path()
1✔
678
    script_exec_path = (
1✔
679
        f"./{script_name}" if not sandbox_base_path else f"{sandbox_base_path}/{script_name}"
680
    )
681

682
    script_header = dedent(
1✔
683
        f"""\
684
        {shebang}
685

686
        set -euox pipefail
687

688
        CHECK_FILE_ENTRIES={"1" if request.check_file_entries else ""}
689
        """
690
    )
691
    script_body = dedent(
1✔
692
        """\
693
        for path in ${PATH//:/ }; do
694
            if [[ -d "${path}" ]]; then
695
                # Handle traditional directory PATH element.
696
                maybe_exe="${path}/$1"
697
            elif [[ -n "${CHECK_FILE_ENTRIES}" ]]; then
698
                # Handle PATH elements that are filenames to allow for precise selection.
699
                maybe_exe="${path}"
700
            else
701
                maybe_exe=
702
            fi
703
            if [[ "$1" == "${maybe_exe##*/}" && -f "${maybe_exe}" && -x "${maybe_exe}" ]]
704
            then
705
                echo "${maybe_exe}"
706
            fi
707
        done
708
        """
709
    )
710
    script_content = script_header + script_body
1✔
711
    script_digest = await create_digest(
1✔
712
        CreateDigest([FileContent(script_name, script_content.encode(), is_executable=True)]),
713
    )
714

715
    # Some subtle notes about executing this script:
716
    #
717
    #  - We run the script with `ProcessResult` instead of `FallibleProcessResult` so that we
718
    #      can catch bugs in the script itself, given an earlier silent failure.
719
    search_path = os.pathsep.join(request.search_path)
1✔
720
    result = await execute_process_or_raise(
1✔
721
        **implicitly(
722
            Process(
723
                description=f"Searching for `{request.binary_name}` on PATH={search_path}",
724
                level=LogLevel.DEBUG,
725
                input_digest=script_digest,
726
                argv=[script_exec_path, request.binary_name],
727
                env={"PATH": search_path},
728
                cache_scope=env_target.executable_search_path_cache_scope(),
729
            ),
730
        )
731
    )
732
    return tuple(result.stdout.decode().splitlines())
1✔
733

734

735
@rule
12✔
736
async def find_binary(
12✔
737
    request: BinaryPathRequest,
738
    env_target: EnvironmentTarget,
739
) -> BinaryPaths:
740
    found_paths: tuple[str, ...]
741
    if env_target.can_access_local_system_paths:
12✔
742
        found_paths = await _find_candidate_paths_via_path_metadata_lookups(request)
12✔
743
    else:
744
        found_paths = await _find_candidate_paths_via_subprocess_helper(request, env_target)
1✔
745

746
    if not request.test:
12✔
747
        return BinaryPaths(
12✔
748
            binary_name=request.binary_name,
749
            paths=(BinaryPath(path) for path in found_paths),
750
        )
751

752
    results = await concurrently(
12✔
753
        execute_process(
754
            Process(
755
                description=f"Test binary {path}.",
756
                level=LogLevel.DEBUG,
757
                argv=[path, *request.test.args],
758
                # NB: Since a failure is a valid result for this script, we always cache it,
759
                # regardless of success or failure.
760
                cache_scope=env_target.executable_search_path_cache_scope(cache_failures=True),
761
            ),
762
            **implicitly(),
763
        )
764
        for path in found_paths
765
    )
766
    return BinaryPaths(
12✔
767
        binary_name=request.binary_name,
768
        paths=[
769
            (
770
                BinaryPath.fingerprinted(path, result.stdout)
771
                if request.test.fingerprint_stdout
772
                else BinaryPath(path, result.stdout.decode())
773
            )
774
            for path, result in zip(found_paths, results)
775
            if result.exit_code == 0
776
        ],
777
    )
778

779

780
@rule
12✔
781
async def create_binary_shims(
12✔
782
    binary_shims_request: BinaryShimsRequest,
783
    bash: BashBinary,
784
) -> BinaryShims:
785
    """Creates a bin directory with shims for all requested binaries.
786

787
    This can be provided to a `Process` as an `immutable_input_digest`, or can be merged into the
788
    input digest.
789
    """
790

791
    paths = binary_shims_request.paths
11✔
792
    requests = binary_shims_request.requests
11✔
793
    if requests:
11✔
794
        all_binary_paths = await concurrently(
6✔
795
            find_binary(request, **implicitly()) for request in requests
796
        )
797
        first_paths = tuple(
6✔
798
            binary_paths.first_path_or_raise(request, rationale=binary_shims_request.rationale)
799
            for binary_paths, request in zip(all_binary_paths, requests)
800
        )
801
        paths += first_paths
6✔
802

803
    def _create_shim(bash: str, binary: str) -> bytes:
11✔
804
        """The binary shim script to be placed in the output directory for the digest."""
805
        return dedent(
11✔
806
            f"""\
807
            #!{bash}
808
            exec "{binary}" "$@"
809
            """
810
        ).encode()
811

812
    scripts = [
11✔
813
        FileContent(
814
            os.path.basename(path.path), _create_shim(bash.path, path.path), is_executable=True
815
        )
816
        for path in paths
817
    ]
818

819
    digest = await create_digest(CreateDigest(scripts))
11✔
820
    cache_name = f"_binary_shims_{digest.fingerprint}"
11✔
821

822
    return BinaryShims(digest, cache_name)
11✔
823

824

825
@rule(desc="Finding the `realpath` binary", level=LogLevel.DEBUG)
12✔
826
async def find_realpath(
12✔
827
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
828
) -> RealpathBinary:
UNCOV
829
    request = BinaryPathRequest(
1✔
830
        binary_name="realpath", search_path=system_binaries.system_binary_paths
831
    )
UNCOV
832
    paths = await find_binary(request, **implicitly())
1✔
UNCOV
833
    first_path = paths.first_path_or_raise(request, rationale="realpath file")
1✔
UNCOV
834
    return RealpathBinary(first_path.path, first_path.fingerprint)
1✔
835

836

837
@rule(desc="Finding the `awk` binary", level=LogLevel.DEBUG)
12✔
838
async def find_awk(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> AwkBinary:
12✔
839
    request = BinaryPathRequest(binary_name="awk", search_path=system_binaries.system_binary_paths)
3✔
840
    paths = await find_binary(request, **implicitly())
3✔
841
    first_path = paths.first_path_or_raise(request, rationale="awk file")
3✔
842
    return AwkBinary(first_path.path, first_path.fingerprint)
3✔
843

844

845
@rule(desc="Finding the `base64` binary", level=LogLevel.DEBUG)
12✔
846
async def find_base64(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Base64Binary:
12✔
847
    request = BinaryPathRequest(
×
848
        binary_name="base64", search_path=system_binaries.system_binary_paths
849
    )
850
    paths = await find_binary(request, **implicitly())
×
851
    first_path = paths.first_path_or_raise(request, rationale="base64 file")
×
852
    return Base64Binary(first_path.path, first_path.fingerprint)
×
853

854

855
@rule(desc="Finding the `basename` binary", level=LogLevel.DEBUG)
12✔
856
async def find_basename(
12✔
857
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
858
) -> BasenameBinary:
859
    request = BinaryPathRequest(
3✔
860
        binary_name="basename", search_path=system_binaries.system_binary_paths
861
    )
862
    paths = await find_binary(request, **implicitly())
3✔
863
    first_path = paths.first_path_or_raise(request, rationale="basename file")
3✔
864
    return BasenameBinary(first_path.path, first_path.fingerprint)
3✔
865

866

867
@rule(desc="Finding the `bzip2` binary", level=LogLevel.DEBUG)
12✔
868
async def find_bzip2(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Bzip2Binary:
12✔
869
    request = BinaryPathRequest(
×
870
        binary_name="bzip2", search_path=system_binaries.system_binary_paths
871
    )
872
    paths = await find_binary(request, **implicitly())
×
873
    first_path = paths.first_path_or_raise(request, rationale="bzip2 file")
×
874
    return Bzip2Binary(first_path.path, first_path.fingerprint)
×
875

876

877
@rule(desc="Finding the `bzip3` binary", level=LogLevel.DEBUG)
12✔
878
async def find_bzip3(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Bzip3Binary:
12✔
879
    request = BinaryPathRequest(
×
880
        binary_name="bzip3", search_path=system_binaries.system_binary_paths
881
    )
882
    paths = await find_binary(request, **implicitly())
×
883
    first_path = paths.first_path_or_raise(request, rationale="bzip3 file")
×
884
    return Bzip3Binary(first_path.path, first_path.fingerprint)
×
885

886

887
@rule(desc="Finding the `cat` binary", level=LogLevel.DEBUG)
12✔
888
async def find_cat(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CatBinary:
12✔
889
    request = BinaryPathRequest(binary_name="cat", search_path=system_binaries.system_binary_paths)
7✔
890
    paths = await find_binary(request, **implicitly())
7✔
891
    first_path = paths.first_path_or_raise(request, rationale="outputting content from files")
7✔
892
    return CatBinary(first_path.path, first_path.fingerprint)
7✔
893

894

895
@rule(desc="Finding the `chmod` binary", level=LogLevel.DEBUG)
12✔
896
async def find_chmod(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ChmodBinary:
12✔
897
    request = BinaryPathRequest(
3✔
898
        binary_name="chmod", search_path=system_binaries.system_binary_paths
899
    )
900
    paths = await find_binary(request, **implicitly())
3✔
901
    first_path = paths.first_path_or_raise(
3✔
902
        request, rationale="change file modes or Access Control Lists"
903
    )
904
    return ChmodBinary(first_path.path, first_path.fingerprint)
3✔
905

906

907
@rule(desc="Finding the `cksum` binary", level=LogLevel.DEBUG)
12✔
908
async def find_cksum(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CksumBinary:
12✔
909
    request = BinaryPathRequest(
3✔
910
        binary_name="cksum", search_path=system_binaries.system_binary_paths
911
    )
912
    paths = await find_binary(request, **implicitly())
3✔
913
    first_path = paths.first_path_or_raise(request, rationale="cksum file")
3✔
914
    return CksumBinary(first_path.path, first_path.fingerprint)
3✔
915

916

917
@rule(desc="Finding the `cp` binary", level=LogLevel.DEBUG)
12✔
918
async def find_cp(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CpBinary:
12✔
919
    request = BinaryPathRequest(binary_name="cp", search_path=system_binaries.system_binary_paths)
6✔
920
    paths = await find_binary(request, **implicitly())
6✔
921
    first_path = paths.first_path_or_raise(request, rationale="copy files")
6✔
922
    return CpBinary(first_path.path, first_path.fingerprint)
6✔
923

924

925
@rule(desc="Finding the `cut` binary", level=LogLevel.DEBUG)
12✔
926
async def find_cut(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> CutBinary:
12✔
927
    request = BinaryPathRequest(binary_name="cut", search_path=system_binaries.system_binary_paths)
3✔
928
    paths = await find_binary(request, **implicitly())
3✔
929
    first_path = paths.first_path_or_raise(request, rationale="cut file")
3✔
930
    return CutBinary(first_path.path, first_path.fingerprint)
3✔
931

932

933
@rule(desc="Finding the `date` binary", level=LogLevel.DEBUG)
12✔
934
async def find_date(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DateBinary:
12✔
935
    request = BinaryPathRequest(binary_name="date", search_path=system_binaries.system_binary_paths)
3✔
936
    paths = await find_binary(request, **implicitly())
3✔
937
    first_path = paths.first_path_or_raise(request, rationale="date file")
3✔
938
    return DateBinary(first_path.path, first_path.fingerprint)
3✔
939

940

941
@rule(desc="Finding the `dd` binary", level=LogLevel.DEBUG)
12✔
942
async def find_dd(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DdBinary:
12✔
943
    request = BinaryPathRequest(binary_name="dd", search_path=system_binaries.system_binary_paths)
3✔
944
    paths = await find_binary(request, **implicitly())
3✔
945
    first_path = paths.first_path_or_raise(request, rationale="dd file")
3✔
946
    return DdBinary(first_path.path, first_path.fingerprint)
3✔
947

948

949
@rule(desc="Finding the `df` binary", level=LogLevel.DEBUG)
12✔
950
async def find_df(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DfBinary:
12✔
951
    request = BinaryPathRequest(binary_name="df", search_path=system_binaries.system_binary_paths)
3✔
952
    paths = await find_binary(request, **implicitly())
3✔
953
    first_path = paths.first_path_or_raise(request, rationale="df file")
3✔
954
    return DfBinary(first_path.path, first_path.fingerprint)
3✔
955

956

957
@rule(desc="Finding the `diff` binary", level=LogLevel.DEBUG)
12✔
958
async def find_diff(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DiffBinary:
12✔
UNCOV
959
    request = BinaryPathRequest(binary_name="diff", search_path=system_binaries.system_binary_paths)
1✔
UNCOV
960
    paths = await find_binary(request, **implicitly())
1✔
UNCOV
961
    first_path = paths.first_path_or_raise(request, rationale="compare files line by line")
1✔
UNCOV
962
    return DiffBinary(first_path.path, first_path.fingerprint)
1✔
963

964

965
@rule(desc="Finding the `dirname` binary", level=LogLevel.DEBUG)
12✔
966
async def find_dirname(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DirnameBinary:
12✔
967
    request = BinaryPathRequest(
3✔
968
        binary_name="dirname", search_path=system_binaries.system_binary_paths
969
    )
970
    paths = await find_binary(request, **implicitly())
3✔
971
    first_path = paths.first_path_or_raise(request, rationale="dirname file")
3✔
972
    return DirnameBinary(first_path.path, first_path.fingerprint)
3✔
973

974

975
@rule(desc="Finding the `du` binary", level=LogLevel.DEBUG)
12✔
976
async def find_du(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> DuBinary:
12✔
977
    request = BinaryPathRequest(binary_name="du", search_path=system_binaries.system_binary_paths)
3✔
978
    paths = await find_binary(request, **implicitly())
3✔
979
    first_path = paths.first_path_or_raise(request, rationale="du file")
3✔
980
    return DuBinary(first_path.path, first_path.fingerprint)
3✔
981

982

983
@rule(desc="Finding the `expr` binary", level=LogLevel.DEBUG)
12✔
984
async def find_expr(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ExprBinary:
12✔
985
    request = BinaryPathRequest(binary_name="expr", search_path=system_binaries.system_binary_paths)
3✔
986
    paths = await find_binary(request, **implicitly())
3✔
987
    first_path = paths.first_path_or_raise(request, rationale="expr file")
3✔
988
    return ExprBinary(first_path.path, first_path.fingerprint)
3✔
989

990

991
@rule(desc="Finding the `find` binary", level=LogLevel.DEBUG)
12✔
992
async def find_find(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> FindBinary:
12✔
993
    request = BinaryPathRequest(binary_name="find", search_path=system_binaries.system_binary_paths)
4✔
994
    paths = await find_binary(request, **implicitly())
4✔
995
    first_path = paths.first_path_or_raise(request, rationale="find file")
4✔
996
    return FindBinary(first_path.path, first_path.fingerprint)
4✔
997

998

999
@rule(desc="Finding the `git` binary", level=LogLevel.DEBUG)
12✔
1000
async def find_git(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GitBinary:
12✔
1001
    request = BinaryPathRequest(binary_name="git", search_path=system_binaries.system_binary_paths)
×
1002
    paths = await find_binary(request, **implicitly())
×
1003
    first_path = paths.first_path_or_raise(
×
1004
        request, rationale="track changes to files in your build environment"
1005
    )
1006
    return GitBinary(first_path.path, first_path.fingerprint)
×
1007

1008

1009
@rule(desc="Finding the `getent` binary", level=LogLevel.DEBUG)
12✔
1010
async def find_getent(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GetentBinary:
12✔
1011
    request = BinaryPathRequest(
×
1012
        binary_name="getent", search_path=system_binaries.system_binary_paths
1013
    )
1014
    paths = await find_binary(request, **implicitly())
×
1015
    first_path = paths.first_path_or_raise(request, rationale="getent file")
×
1016
    return GetentBinary(first_path.path, first_path.fingerprint)
×
1017

1018

1019
@rule(desc="Finding the `gpg` binary", level=LogLevel.DEBUG)
12✔
1020
async def find_gpg(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GpgBinary:
12✔
1021
    request = BinaryPathRequest(binary_name="gpg", search_path=system_binaries.system_binary_paths)
×
1022
    paths = await find_binary(request, **implicitly())
×
1023
    first_path = paths.first_path_or_raise(request, rationale="gpg file")
×
1024
    return GpgBinary(first_path.path, first_path.fingerprint)
×
1025

1026

1027
@rule(desc="Finding the `gzip` binary", level=LogLevel.DEBUG)
12✔
1028
async def find_gzip(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> GzipBinary:
12✔
1029
    request = BinaryPathRequest(binary_name="gzip", search_path=system_binaries.system_binary_paths)
3✔
1030
    paths = await find_binary(request, **implicitly())
3✔
1031
    first_path = paths.first_path_or_raise(request, rationale="gzip file")
3✔
1032
    return GzipBinary(first_path.path, first_path.fingerprint)
3✔
1033

1034

1035
@rule(desc="Finding the `head` binary", level=LogLevel.DEBUG)
12✔
1036
async def find_head(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> HeadBinary:
12✔
1037
    request = BinaryPathRequest(binary_name="head", search_path=system_binaries.system_binary_paths)
3✔
1038
    paths = await find_binary(request, **implicitly())
3✔
1039
    first_path = paths.first_path_or_raise(request, rationale="head file")
3✔
1040
    return HeadBinary(first_path.path, first_path.fingerprint)
3✔
1041

1042

1043
@rule(desc="Finding the `id` binary", level=LogLevel.DEBUG)
12✔
1044
async def find_id(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> IdBinary:
12✔
1045
    request = BinaryPathRequest(binary_name="id", search_path=system_binaries.system_binary_paths)
3✔
1046
    paths = await find_binary(request, **implicitly())
3✔
1047
    first_path = paths.first_path_or_raise(request, rationale="id file")
3✔
1048
    return IdBinary(first_path.path, first_path.fingerprint)
3✔
1049

1050

1051
@rule(desc="Finding the `ln` binary", level=LogLevel.DEBUG)
12✔
1052
async def find_ln(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> LnBinary:
12✔
1053
    request = BinaryPathRequest(binary_name="ln", search_path=system_binaries.system_binary_paths)
12✔
1054
    paths = await find_binary(request, **implicitly())
12✔
1055
    first_path = paths.first_path_or_raise(request, rationale="link files")
12✔
1056
    return LnBinary(first_path.path, first_path.fingerprint)
12✔
1057

1058

1059
@rule(desc="Finding the `lz4` binary", level=LogLevel.DEBUG)
12✔
1060
async def find_lz4(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Lz4Binary:
12✔
1061
    request = BinaryPathRequest(binary_name="lz4", search_path=system_binaries.system_binary_paths)
×
1062
    paths = await find_binary(request, **implicitly())
×
1063
    first_path = paths.first_path_or_raise(request, rationale="lz4 file")
×
1064
    return Lz4Binary(first_path.path, first_path.fingerprint)
×
1065

1066

1067
@rule(desc="Finding the `lzop` binary", level=LogLevel.DEBUG)
12✔
1068
async def find_lzop(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> LzopBinary:
12✔
1069
    request = BinaryPathRequest(binary_name="lzop", search_path=system_binaries.system_binary_paths)
×
1070
    paths = await find_binary(request, **implicitly())
×
1071
    first_path = paths.first_path_or_raise(request, rationale="lzop file")
×
1072
    return LzopBinary(first_path.path, first_path.fingerprint)
×
1073

1074

1075
@rule(desc="Finding the `md5sum` binary", level=LogLevel.DEBUG)
12✔
1076
async def find_md5sum(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> Md5sumBinary:
12✔
1077
    request = BinaryPathRequest(
×
1078
        binary_name="md5sum", search_path=system_binaries.system_binary_paths
1079
    )
1080
    paths = await find_binary(request, **implicitly())
×
1081
    first_path = paths.first_path_or_raise(request, rationale="md5sum file")
×
1082
    return Md5sumBinary(first_path.path, first_path.fingerprint)
×
1083

1084

1085
@rule(desc="Finding the `mkdir` binary", level=LogLevel.DEBUG)
12✔
1086
async def find_mkdir(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> MkdirBinary:
12✔
1087
    request = BinaryPathRequest(
12✔
1088
        binary_name="mkdir", search_path=system_binaries.system_binary_paths
1089
    )
1090
    paths = await find_binary(request, **implicitly())
12✔
1091
    first_path = paths.first_path_or_raise(request, rationale="create directories")
12✔
1092
    return MkdirBinary(first_path.path, first_path.fingerprint)
12✔
1093

1094

1095
@rule(desc="Finding the `mktempt` binary", level=LogLevel.DEBUG)
12✔
1096
async def find_mktemp(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> MktempBinary:
12✔
1097
    request = BinaryPathRequest(
3✔
1098
        binary_name="mktemp", search_path=system_binaries.system_binary_paths
1099
    )
1100
    paths = await find_binary(request, **implicitly())
3✔
1101
    first_path = paths.first_path_or_raise(request, rationale="create temporary files/directories")
3✔
1102
    return MktempBinary(first_path.path, first_path.fingerprint)
3✔
1103

1104

1105
@rule(desc="Finding the `mv` binary", level=LogLevel.DEBUG)
12✔
1106
async def find_mv(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> MvBinary:
12✔
1107
    request = BinaryPathRequest(binary_name="mv", search_path=system_binaries.system_binary_paths)
5✔
1108
    paths = await find_binary(request, **implicitly())
5✔
1109
    first_path = paths.first_path_or_raise(request, rationale="move files")
5✔
1110
    return MvBinary(first_path.path, first_path.fingerprint)
5✔
1111

1112

1113
@rule(desc="Finding the `open` binary", level=LogLevel.DEBUG)
12✔
1114
async def find_open(
12✔
1115
    platform: Platform, system_binaries: SystemBinariesSubsystem.EnvironmentAware
1116
) -> OpenBinary:
1117
    request = BinaryPathRequest(
×
1118
        binary_name=("open" if platform.is_macos else "xdg-open"),
1119
        search_path=system_binaries.system_binary_paths,
1120
    )
1121
    paths = await find_binary(request, **implicitly())
×
1122
    first_path = paths.first_path_or_raise(request, rationale="open URLs with default browser")
×
1123
    return OpenBinary(first_path.path, first_path.fingerprint)
×
1124

1125

1126
@rule(desc="Finding the `pwd` binary", level=LogLevel.DEBUG)
12✔
1127
async def find_pwd(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> PwdBinary:
12✔
1128
    request = BinaryPathRequest(binary_name="pwd", search_path=system_binaries.system_binary_paths)
3✔
1129
    paths = await find_binary(request, **implicitly())
3✔
1130
    first_path = paths.first_path_or_raise(request, rationale="pwd file")
3✔
1131
    return PwdBinary(first_path.path, first_path.fingerprint)
3✔
1132

1133

1134
@rule(desc="Finding the `readlink` binary", level=LogLevel.DEBUG)
12✔
1135
async def find_readlink(
12✔
1136
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
1137
) -> ReadlinkBinary:
1138
    request = BinaryPathRequest(
1✔
1139
        binary_name="readlink", search_path=system_binaries.system_binary_paths
1140
    )
1141
    paths = await find_binary(request, **implicitly())
1✔
1142
    first_path = paths.first_path_or_raise(request, rationale="dereference symlinks")
1✔
1143
    return ReadlinkBinary(first_path.path, first_path.fingerprint)
1✔
1144

1145

1146
@rule(desc="Finding the `rm` binary", level=LogLevel.DEBUG)
12✔
1147
async def find_rm(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> RmBinary:
12✔
1148
    request = BinaryPathRequest(binary_name="rm", search_path=system_binaries.system_binary_paths)
4✔
1149
    paths = await find_binary(request, **implicitly())
4✔
1150
    first_path = paths.first_path_or_raise(request, rationale="rm file")
4✔
1151
    return RmBinary(first_path.path, first_path.fingerprint)
4✔
1152

1153

1154
@rule(desc="Finding the `sed` binary", level=LogLevel.DEBUG)
12✔
1155
async def find_sed(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> SedBinary:
12✔
1156
    request = BinaryPathRequest(binary_name="sed", search_path=system_binaries.system_binary_paths)
3✔
1157
    paths = await find_binary(request, **implicitly())
3✔
1158
    first_path = paths.first_path_or_raise(request, rationale="sed file")
3✔
1159
    return SedBinary(first_path.path, first_path.fingerprint)
3✔
1160

1161

1162
@rule(desc="Finding the `sh` binary", level=LogLevel.DEBUG)
12✔
1163
async def find_sh(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ShBinary:
12✔
1164
    request = BinaryPathRequest(binary_name="sh", search_path=system_binaries.system_binary_paths)
4✔
1165
    paths = await find_binary(request, **implicitly())
4✔
1166
    first_path = paths.first_path_or_raise(request, rationale="sh file")
4✔
1167
    return ShBinary(first_path.path, first_path.fingerprint)
4✔
1168

1169

1170
@rule(desc="Finding the `shasum` binary", level=LogLevel.DEBUG)
12✔
1171
async def find_shasum(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ShasumBinary:
12✔
1172
    request = BinaryPathRequest(
×
1173
        binary_name="shasum", search_path=system_binaries.system_binary_paths
1174
    )
1175
    paths = await find_binary(request, **implicitly())
×
1176
    first_path = paths.first_path_or_raise(request, rationale="shasum file")
×
1177
    return ShasumBinary(first_path.path, first_path.fingerprint)
×
1178

1179

1180
@rule(desc="Finding the `sort` binary", level=LogLevel.DEBUG)
12✔
1181
async def find_sort(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> SortBinary:
12✔
1182
    request = BinaryPathRequest(binary_name="sort", search_path=system_binaries.system_binary_paths)
3✔
1183
    paths = await find_binary(request, **implicitly())
3✔
1184
    first_path = paths.first_path_or_raise(request, rationale="sort file")
3✔
1185
    return SortBinary(first_path.path, first_path.fingerprint)
3✔
1186

1187

1188
@rule(desc="Finding the `tail` binary", level=LogLevel.DEBUG)
12✔
1189
async def find_tail(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TailBinary:
12✔
1190
    request = BinaryPathRequest(binary_name="tail", search_path=system_binaries.system_binary_paths)
3✔
1191
    paths = await find_binary(request, **implicitly())
3✔
1192
    first_path = paths.first_path_or_raise(request, rationale="tail file")
3✔
1193
    return TailBinary(first_path.path, first_path.fingerprint)
3✔
1194

1195

1196
@rule(desc="Finding the `tar` binary", level=LogLevel.DEBUG)
12✔
1197
async def find_tar(
12✔
1198
    platform: Platform, system_binaries: SystemBinariesSubsystem.EnvironmentAware
1199
) -> TarBinary:
1200
    request = BinaryPathRequest(
12✔
1201
        binary_name="tar",
1202
        search_path=system_binaries.system_binary_paths,
1203
        test=BinaryPathTest(args=["--version"]),
1204
    )
1205
    paths = await find_binary(request, **implicitly())
12✔
1206
    first_path = paths.first_path_or_raise(
12✔
1207
        request, rationale="download the tools Pants needs to run"
1208
    )
1209
    return TarBinary(first_path.path, first_path.fingerprint, platform)
12✔
1210

1211

1212
@rule(desc="Finding the `test` binary", level=LogLevel.DEBUG)
12✔
1213
async def find_test(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TestBinary:
12✔
1214
    request = BinaryPathRequest(binary_name="test", search_path=system_binaries.system_binary_paths)
4✔
1215
    paths = await find_binary(request, **implicitly())
4✔
1216
    first_path = paths.first_path_or_raise(request, rationale="test file")
4✔
1217
    return TestBinary(first_path.path, first_path.fingerprint)
4✔
1218

1219

1220
@rule(desc="Finding the `touch` binary", level=LogLevel.DEBUG)
12✔
1221
async def find_touch(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TouchBinary:
12✔
1222
    request = BinaryPathRequest(
2✔
1223
        binary_name="touch", search_path=system_binaries.system_binary_paths
1224
    )
1225
    paths = await find_binary(request, **implicitly())
2✔
1226
    first_path = paths.first_path_or_raise(request, rationale="touch file")
2✔
1227
    return TouchBinary(first_path.path, first_path.fingerprint)
2✔
1228

1229

1230
@rule(desc="Finding the `tr` binary", level=LogLevel.DEBUG)
12✔
1231
async def find_tr(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> TrBinary:
12✔
1232
    request = BinaryPathRequest(binary_name="tr", search_path=system_binaries.system_binary_paths)
3✔
1233
    paths = await find_binary(request, **implicitly())
3✔
1234
    first_path = paths.first_path_or_raise(request, rationale="tr file")
3✔
1235
    return TrBinary(first_path.path, first_path.fingerprint)
3✔
1236

1237

1238
@rule(desc="Finding the `unzip` binary", level=LogLevel.DEBUG)
12✔
1239
async def find_unzip(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> UnzipBinary:
12✔
1240
    request = BinaryPathRequest(
12✔
1241
        binary_name="unzip",
1242
        search_path=system_binaries.system_binary_paths,
1243
        test=BinaryPathTest(args=["-v"]),
1244
    )
1245
    paths = await find_binary(request, **implicitly())
12✔
1246
    first_path = paths.first_path_or_raise(
12✔
1247
        request, rationale="download the tools Pants needs to run"
1248
    )
1249
    return UnzipBinary(first_path.path, first_path.fingerprint)
12✔
1250

1251

1252
@rule(desc="Finding the `wc` binary", level=LogLevel.DEBUG)
12✔
1253
async def find_wc(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> WcBinary:
12✔
1254
    request = BinaryPathRequest(binary_name="wc", search_path=system_binaries.system_binary_paths)
3✔
1255
    paths = await find_binary(request, **implicitly())
3✔
1256
    first_path = paths.first_path_or_raise(request, rationale="wc file")
3✔
1257
    return WcBinary(first_path.path, first_path.fingerprint)
3✔
1258

1259

1260
@rule(desc="Finding the `xargs` binary", level=LogLevel.DEBUG)
12✔
1261
async def find_xargs(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> XargsBinary:
12✔
1262
    request = BinaryPathRequest(
3✔
1263
        binary_name="xargs", search_path=system_binaries.system_binary_paths
1264
    )
1265
    paths = await find_binary(request, **implicitly())
3✔
1266
    first_path = paths.first_path_or_raise(request, rationale="xargs file")
3✔
1267
    return XargsBinary(first_path.path, first_path.fingerprint)
3✔
1268

1269

1270
@rule(desc="Finding the `xz` binary", level=LogLevel.DEBUG)
12✔
1271
async def find_xz(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> XzBinary:
12✔
1272
    request = BinaryPathRequest(binary_name="xz", search_path=system_binaries.system_binary_paths)
×
1273
    paths = await find_binary(request, **implicitly())
×
1274
    first_path = paths.first_path_or_raise(request, rationale="xz file")
×
1275
    return XzBinary(first_path.path, first_path.fingerprint)
×
1276

1277

1278
@rule(desc="Finding the `zip` binary", level=LogLevel.DEBUG)
12✔
1279
async def find_zip(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ZipBinary:
12✔
1280
    request = BinaryPathRequest(
10✔
1281
        binary_name="zip",
1282
        search_path=system_binaries.system_binary_paths,
1283
        test=BinaryPathTest(args=["-v"]),
1284
    )
1285
    paths = await find_binary(request, **implicitly())
10✔
1286
    first_path = paths.first_path_or_raise(request, rationale="create `.zip` archives")
10✔
1287
    return ZipBinary(first_path.path, first_path.fingerprint)
10✔
1288

1289

1290
@rule(desc="Finding the `zstd` binary", level=LogLevel.DEBUG)
12✔
1291
async def find_zstd(system_binaries: SystemBinariesSubsystem.EnvironmentAware) -> ZstdBinary:
12✔
1292
    request = BinaryPathRequest(binary_name="zstd", search_path=system_binaries.system_binary_paths)
×
1293
    paths = await find_binary(request, **implicitly())
×
1294
    first_path = paths.first_path_or_raise(request, rationale="zstd file")
×
1295
    return ZstdBinary(first_path.path, first_path.fingerprint)
×
1296

1297

1298
def rules():
12✔
1299
    return [*collect_rules(), *python_bootstrap.rules()]
12✔
1300

1301

1302
# -------------------------------------------------------------------------------------------
1303
# Rules for fallible binaries
1304
# -------------------------------------------------------------------------------------------
1305

1306

1307
@dataclass(frozen=True)
12✔
1308
class MaybeGitBinary:
12✔
1309
    git_binary: GitBinary | None = None
12✔
1310

1311

1312
@rule(desc="Finding the `git` binary", level=LogLevel.DEBUG)
12✔
1313
async def maybe_find_git(
12✔
1314
    system_binaries: SystemBinariesSubsystem.EnvironmentAware,
1315
) -> MaybeGitBinary:
1316
    request = BinaryPathRequest(binary_name="git", search_path=system_binaries.system_binary_paths)
2✔
1317
    paths = await find_binary(request, **implicitly())
2✔
1318
    first_path = paths.first_path
2✔
1319
    if not first_path:
2✔
1320
        return MaybeGitBinary()
×
1321

1322
    return MaybeGitBinary(GitBinary(first_path.path, first_path.fingerprint))
2✔
1323

1324

1325
class MaybeGitBinaryRequest:
12✔
1326
    pass
12✔
1327

1328

1329
@rule
12✔
1330
async def maybe_find_git_wrapper(
12✔
1331
    _: MaybeGitBinaryRequest, maybe_git_binary: MaybeGitBinary
1332
) -> MaybeGitBinary:
1333
    return maybe_git_binary
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc