• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26380816428

25 May 2026 02:57AM UTC coverage: 52.312% (-40.6%) from 92.89%
26380816428

Pull #23368

github

web-flow
Merge 7410b48e1 into 7b1060c81
Pull Request #23368: Run Linux ARM CI on Depot runners (Cherry-pick of #23363)

31807 of 60802 relevant lines covered (52.31%)

1.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.01
/src/python/pants/vcs/git.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import dataclasses
2✔
7
import logging
2✔
8
import os
2✔
9
import re
2✔
10
from collections import defaultdict
2✔
11
from collections.abc import Iterable
2✔
12
from dataclasses import dataclass
2✔
13
from functools import cached_property
2✔
14
from io import BytesIO
2✔
15
from os import PathLike
2✔
16
from pathlib import Path, PurePath
2✔
17
from typing import Any, DefaultDict
2✔
18

19
from pants.core.util_rules.system_binaries import GitBinary, GitBinaryException, MaybeGitBinary
2✔
20
from pants.engine.engine_aware import EngineAwareReturnType
2✔
21
from pants.engine.rules import collect_rules, rule
2✔
22
from pants.util.contextutil import pushd
2✔
23
from pants.vcs.change import ChangedFile, ChangeType
2✔
24
from pants.vcs.hunk import Hunk, TextBlock
2✔
25

26
logger = logging.getLogger(__name__)
2✔
27

28

29
class GitWorktree(EngineAwareReturnType):
2✔
30
    """Implements a safe wrapper for un-sandboxed access to Git in the user's working copy.
31

32
    This type (and any wrappers) should be marked `EngineAwareReturnType.cacheable=False`, because
33
    it internally uses un-sandboxed APIs, and `@rules` which produce it should re-run in each
34
    session. It additionally implements a default `__eq__` in order to prevent early-cutoff in the
35
    graph, and force any consumers of the type to re-run.
36
    """
37

38
    worktree: PurePath
2✔
39
    _gitdir: PurePath
2✔
40
    _git_binary: GitBinary
2✔
41

42
    def __init__(
2✔
43
        self,
44
        binary: GitBinary,
45
        worktree: PathLike[str] | None = None,
46
        gitdir: PathLike[str] | None = None,
47
    ) -> None:
48
        """Creates a git object that assumes the git repository is in the cwd by default.
49

50
        binary:    The git binary to use.
51
        worktree:  The path to the git repository working tree directory (typically '.').
52
        gitdir:    The path to the repository's git metadata directory (typically '.git').
53
        """
54
        self.worktree = Path(worktree or os.getcwd()).resolve()
×
55
        self._gitdir = Path(gitdir).resolve() if gitdir else (self.worktree / ".git")
×
56
        self._git_binary = binary
×
57
        self._diff_parser = DiffParser()
×
58

59
    def cacheable(self) -> bool:
2✔
60
        return False
×
61

62
    @property
2✔
63
    def current_rev_identifier(self):
2✔
64
        return "HEAD"
×
65

66
    @property
2✔
67
    def commit_id(self):
2✔
68
        return self._git_binary._invoke_unsandboxed(
×
69
            self._create_git_cmdline(["rev-parse", "HEAD"])
70
        ).decode()
71

72
    @property
2✔
73
    def branch_name(self) -> str | None:
2✔
74
        branch = self._git_binary._invoke_unsandboxed(
×
75
            self._create_git_cmdline(["rev-parse", "--abbrev-ref", "HEAD"])
76
        ).decode()
77
        return None if branch == "HEAD" else branch
×
78

79
    def _fix_git_relative_path(self, worktree_path: str, relative_to: PurePath | str) -> str:
2✔
80
        return str((self.worktree / worktree_path).relative_to(relative_to))
×
81

82
    def changed_files(
2✔
83
        self,
84
        from_commit: str | None = None,
85
        include_untracked: bool = False,
86
        relative_to: PurePath | str | None = None,
87
    ) -> set[ChangedFile]:
88
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
89
        rel_suffix = ["--", str(relative_to)]
×
90

91
        files: dict[str, ChangeType] = {}
×
92

93
        uncommitted_changes = self._git_binary._invoke_unsandboxed(
×
94
            self._create_git_cmdline(
95
                ["diff", "--name-status", "--no-renames", "HEAD"] + rel_suffix,
96
            )
97
        )
98
        files.update(self._parse_name_status(uncommitted_changes))
×
99

100
        if from_commit:
×
101
            # Grab the diff from the merge-base to HEAD using ... syntax.  This ensures we have just
102
            # the changes that have occurred on the current branch.
103
            committed_cmd = [
×
104
                "diff",
105
                "--name-status",
106
                "--no-renames",
107
                from_commit + "...HEAD",
108
            ] + rel_suffix
109
            committed_changes = self._git_binary._invoke_unsandboxed(
×
110
                self._create_git_cmdline(committed_cmd)
111
            )
112
            files.update(self._parse_name_status(committed_changes))
×
113

114
        if include_untracked:
×
115
            untracked_cmd = [
×
116
                "ls-files",
117
                "--other",
118
                "--exclude-standard",
119
                "--full-name",
120
            ] + rel_suffix
121
            untracked = self._git_binary._invoke_unsandboxed(
×
122
                self._create_git_cmdline(untracked_cmd)
123
            )
124
            files.update(
×
125
                {path: ChangeType.ADDED for path in untracked.decode().splitlines() if path}
126
            )
127

128
        # git will report changed files relative to the worktree: re-relativize to relative_to
129
        return {
×
130
            ChangedFile(
131
                path=self._fix_git_relative_path(path, relative_to),
132
                change_type=change_type,
133
            )
134
            for path, change_type in files.items()
135
        }
136

137
    @staticmethod
2✔
138
    def _parse_name_status(output: bytes) -> dict[str, ChangeType]:
2✔
139
        """Parse `git diff --name-status` output into a dict of path -> ChangeType."""
140
        result = {}
×
141
        for line in output.decode().splitlines():
×
142
            if not line:
×
143
                continue
×
144
            status, _, path = line.partition("\t")
×
145
            letter = status[0].upper() if status else "M"
×
146
            try:
×
147
                change_type = ChangeType(letter)
×
148
            except ValueError:
×
149
                # Git may report various other esoteric statuses.
150
                # We treat anything that isn't A, D or M as if it were M.
151
                change_type = ChangeType.MODIFIED
×
152
            result[path] = change_type
×
153
        return result
×
154

155
    def changed_files_lines(
2✔
156
        self,
157
        paths: Iterable[str],
158
        /,
159
        *,
160
        from_commit: str | None = None,
161
        relative_to: PurePath | str | None = None,
162
        include_untracked: bool = False,
163
    ) -> dict[str, tuple[Hunk, ...]]:
164
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
165

166
        result = self._git_diff(
×
167
            "--unified=0",
168
            "HEAD",
169
            "--",
170
            *[str(relative_to / path) for path in paths],
171
        )
172

173
        if from_commit:
×
174
            diff = self._git_diff(
×
175
                "--unified=0",
176
                from_commit + "...HEAD",
177
                "--",
178
                *[str(relative_to / path) for path in paths],
179
            )
180
            result.update(diff)
×
181

182
        if include_untracked:
×
183
            # There is no git diff flag to include untracked files, so we get
184
            # the list of untracked files and manually create the diff by
185
            # comparing each file to an empty /dev/null.
186
            untracked_files = (
×
187
                self._git(
188
                    "ls-files",
189
                    "--other",
190
                    "--exclude-standard",
191
                    "--full-name",
192
                )
193
                .decode()
194
                .splitlines()
195
            )
196
            for file in set(untracked_files).intersection(paths):
×
197
                untracked_diff = self._git_diff("--no-index", "/dev/null", str(relative_to / file))
×
198
                assert len(untracked_diff) == 1
×
199
                result[file] = next(iter(untracked_diff.values()))
×
200

201
        return result
×
202

203
    def _git(self, *args: str) -> bytes:
2✔
204
        """Run unsandboxed git command."""
205
        return self._git_binary._invoke_unsandboxed(self._create_git_cmdline(args))
×
206

207
    def _git_diff(self, *args: str) -> dict[str, tuple[Hunk, ...]]:
2✔
208
        """Run unsandboxed git diff command and parse the diff."""
209
        return self._diff_parser.parse_unified_diff(self._git("diff", *args))
×
210

211
    def changes_in(
2✔
212
        self, diffspec: str, relative_to: PurePath | str | None = None
213
    ) -> set[ChangedFile]:
214
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
215
        cmd = ["diff-tree", "--no-commit-id", "--name-status", "--no-renames", "-r", diffspec]
×
216
        output = self._git_binary._invoke_unsandboxed(self._create_git_cmdline(cmd))
×
217
        return {
×
218
            ChangedFile(
219
                path=self._fix_git_relative_path(path, relative_to),
220
                change_type=change_type,
221
            )
222
            for path, change_type in self._parse_name_status(output).items()
223
        }
224

225
    def _create_git_cmdline(self, args: Iterable[str]) -> list[str]:
2✔
226
        return [f"--git-dir={self._gitdir}", f"--work-tree={self.worktree}", *args]
×
227

228
    def __eq__(self, other: Any) -> bool:
2✔
229
        # NB: See the class doc regarding equality.
230
        return id(self) == id(other)
×
231

232

233
class ParseError(Exception):
2✔
234
    pass
2✔
235

236

237
class DiffParser:
2✔
238
    def parse_unified_diff(self, content: bytes) -> dict[str, tuple[Hunk, ...]]:
2✔
239
        buf = BytesIO(content)
×
240
        current_file = None
×
241
        hunks: DefaultDict[str, list[Hunk]] = defaultdict(list)
×
242
        for line in buf:
×
243
            line = line.strip()
×
244

245
            if match := self._filename_regex.match(line):
×
246
                if current_file is not None:
×
247
                    # mypy false positive: https://github.com/python/mypy/issues/14987
248
                    hunks.setdefault(
×
249
                        current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))]
250
                    )
251
                current_file = self._parse_filename(match)
×
252
                if current_file is None:
×
253
                    raise ValueError(f"failed to parse filename from line: `{line!r}`")
×
254
                continue
×
255

256
            if match := self._lines_changed_regex.match(line):
×
257
                if current_file is None:
×
258
                    raise ParseError(f"missing filename in the diff:\n{content!r}")
×
259

260
                try:
×
261
                    hunk = self._parse_hunk(match, line)
×
262
                except ValueError as e:
×
263
                    raise ValueError(f"Failed to parse hunk: {line!r}") from e
×
264

265
                hunks[current_file].append(hunk)
×
266
                continue
×
267

268
        if current_file is not None:
×
269
            hunks.setdefault(current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))])
×
270
        return {filename: tuple(file_hunks) for filename, file_hunks in hunks.items()}
×
271

272
    @cached_property
2✔
273
    def _lines_changed_regex(self) -> re.Pattern:
2✔
274
        return re.compile(rb"^@@ -([0-9]+)(,([0-9]+))? \+([0-9]+)(,([0-9]+))? @@.*")
×
275

276
    def _parse_hunk(self, match: re.Match, line: bytes) -> Hunk:
2✔
277
        g = match.groups()
×
278
        return Hunk(
×
279
            left=TextBlock(
280
                start=int(g[0]),
281
                count=int(g[2]) if g[2] is not None else 1,
282
            ),
283
            right=TextBlock(
284
                start=int(g[3]),
285
                count=int(g[5]) if g[5] is not None else 1,
286
            ),
287
        )
288

289
    @cached_property
2✔
290
    def _filename_regex(self) -> re.Pattern:
2✔
291
        # This only handles whitespaces. It doesn't work if a filename has something weird
292
        # in it that needs escaping, e.g. a double quote.
293
        a_file = rb'(?:a/(?:[^"]+)|"a/(:?(?:[^"]|\\")+)")'
×
294
        b_file = rb'(?:b/(?P<unquoted>[^"]+)|"b/(?P<quoted>(?:[^"]|\\")+)")'
×
295
        return re.compile(b"^diff --git " + a_file + b" " + b_file + b"$")
×
296

297
    def _parse_filename(self, match: re.Match) -> str | None:
2✔
298
        unquoted = g.decode() if (g := match.group("unquoted")) is not None else None
×
299
        quoted = (
×
300
            g.decode().replace(r"\"", '"') if (g := match.group("quoted")) is not None else None
301
        )
302
        return unquoted or quoted
×
303

304

305
@dataclass(frozen=True)
2✔
306
class MaybeGitWorktree(EngineAwareReturnType):
2✔
307
    git_worktree: GitWorktree | None = None
2✔
308
    failure_reason: str | None = None  # If git_worktree is None, the reason why.
2✔
309

310
    def cacheable(self) -> bool:
2✔
311
        return False
×
312

313

314
@dataclasses.dataclass(frozen=True)
2✔
315
class GitWorktreeRequest:
2✔
316
    gitdir: PathLike[str] | None = None
2✔
317
    subdir: PathLike[str] | None = None
2✔
318

319

320
@rule
2✔
321
async def get_git_worktree(
2✔
322
    git_worktree_request: GitWorktreeRequest,
323
    maybe_git_binary: MaybeGitBinary,
324
) -> MaybeGitWorktree:
325
    if not maybe_git_binary.git_binary:
×
326
        return MaybeGitWorktree(failure_reason="couldn't find `git` binary")
×
327

328
    git_binary = maybe_git_binary.git_binary
×
329
    cmd = ["rev-parse", "--show-toplevel"]
×
330

331
    try:
×
332
        if git_worktree_request.subdir:
×
333
            with pushd(str(git_worktree_request.subdir)):
×
334
                output = git_binary._invoke_unsandboxed(cmd)
×
335
        else:
336
            output = git_binary._invoke_unsandboxed(cmd)
×
337
    except GitBinaryException as e:
×
338
        failure_msg = f"no git repository at {os.getcwd()}: {e!r}"
×
339
        logger.info(failure_msg)
×
340
        return MaybeGitWorktree(failure_reason=failure_msg)
×
341

342
    git_worktree = GitWorktree(
×
343
        binary=git_binary,
344
        gitdir=git_worktree_request.gitdir,
345
        worktree=PurePath(output.decode()),
346
    )
347

348
    logger.debug(
×
349
        f"Detected git repository at {git_worktree.worktree} on branch {git_worktree.branch_name}"
350
    )
351
    return MaybeGitWorktree(git_worktree=git_worktree)
×
352

353

354
def rules():
2✔
355
    return [*collect_rules()]
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc