• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26260209689

21 May 2026 11:59PM UTC coverage: 75.453% (-15.7%) from 91.156%
26260209689

Pull #23365

github

web-flow
Merge 5fe873b58 into 7ea655ba0
Pull Request #23365: uv.lock -> pex optimization

5 of 16 new or added lines in 1 file covered. (31.25%)

10118 existing lines in 378 files now uncovered.

54669 of 72454 relevant lines covered (75.45%)

2.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.01
/src/python/pants/vcs/git.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
5✔
5

6
import dataclasses
5✔
7
import logging
5✔
8
import os
5✔
9
import re
5✔
10
from collections import defaultdict
5✔
11
from collections.abc import Iterable
5✔
12
from dataclasses import dataclass
5✔
13
from functools import cached_property
5✔
14
from io import BytesIO
5✔
15
from os import PathLike
5✔
16
from pathlib import Path, PurePath
5✔
17
from typing import Any, DefaultDict
5✔
18

19
from pants.core.util_rules.system_binaries import GitBinary, GitBinaryException, MaybeGitBinary
5✔
20
from pants.engine.engine_aware import EngineAwareReturnType
5✔
21
from pants.engine.rules import collect_rules, rule
5✔
22
from pants.util.contextutil import pushd
5✔
23
from pants.vcs.change import ChangedFile, ChangeType
5✔
24
from pants.vcs.hunk import Hunk, TextBlock
5✔
25

26
logger = logging.getLogger(__name__)
5✔
27

28

29
class GitWorktree(EngineAwareReturnType):
5✔
30
    """Implements a safe wrapper for un-sandboxed access to Git in the user's working copy.
31

32
    This type (and any wrappers) should be marked `EngineAwareReturnType.cacheable=False`, because
33
    it internally uses un-sandboxed APIs, and `@rules` which produce it should re-run in each
34
    session. It additionally implements a default `__eq__` in order to prevent early-cutoff in the
35
    graph, and force any consumers of the type to re-run.
36
    """
37

38
    worktree: PurePath
5✔
39
    _gitdir: PurePath
5✔
40
    _git_binary: GitBinary
5✔
41

42
    def __init__(
5✔
43
        self,
44
        binary: GitBinary,
45
        worktree: PathLike[str] | None = None,
46
        gitdir: PathLike[str] | None = None,
47
    ) -> None:
48
        """Creates a git object that assumes the git repository is in the cwd by default.
49

50
        binary:    The git binary to use.
51
        worktree:  The path to the git repository working tree directory (typically '.').
52
        gitdir:    The path to the repository's git metadata directory (typically '.git').
53
        """
UNCOV
54
        self.worktree = Path(worktree or os.getcwd()).resolve()
×
UNCOV
55
        self._gitdir = Path(gitdir).resolve() if gitdir else (self.worktree / ".git")
×
UNCOV
56
        self._git_binary = binary
×
UNCOV
57
        self._diff_parser = DiffParser()
×
58

59
    def cacheable(self) -> bool:
5✔
60
        return False
×
61

62
    @property
5✔
63
    def current_rev_identifier(self):
5✔
64
        return "HEAD"
×
65

66
    @property
5✔
67
    def commit_id(self):
5✔
68
        return self._git_binary._invoke_unsandboxed(
×
69
            self._create_git_cmdline(["rev-parse", "HEAD"])
70
        ).decode()
71

72
    @property
5✔
73
    def branch_name(self) -> str | None:
5✔
UNCOV
74
        branch = self._git_binary._invoke_unsandboxed(
×
75
            self._create_git_cmdline(["rev-parse", "--abbrev-ref", "HEAD"])
76
        ).decode()
UNCOV
77
        return None if branch == "HEAD" else branch
×
78

79
    def _fix_git_relative_path(self, worktree_path: str, relative_to: PurePath | str) -> str:
5✔
UNCOV
80
        return str((self.worktree / worktree_path).relative_to(relative_to))
×
81

82
    def changed_files(
5✔
83
        self,
84
        from_commit: str | None = None,
85
        include_untracked: bool = False,
86
        relative_to: PurePath | str | None = None,
87
    ) -> set[ChangedFile]:
UNCOV
88
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
UNCOV
89
        rel_suffix = ["--", str(relative_to)]
×
90

UNCOV
91
        files: dict[str, ChangeType] = {}
×
92

UNCOV
93
        uncommitted_changes = self._git_binary._invoke_unsandboxed(
×
94
            self._create_git_cmdline(
95
                ["diff", "--name-status", "--no-renames", "HEAD"] + rel_suffix,
96
            )
97
        )
UNCOV
98
        files.update(self._parse_name_status(uncommitted_changes))
×
99

UNCOV
100
        if from_commit:
×
101
            # Grab the diff from the merge-base to HEAD using ... syntax.  This ensures we have just
102
            # the changes that have occurred on the current branch.
UNCOV
103
            committed_cmd = [
×
104
                "diff",
105
                "--name-status",
106
                "--no-renames",
107
                from_commit + "...HEAD",
108
            ] + rel_suffix
UNCOV
109
            committed_changes = self._git_binary._invoke_unsandboxed(
×
110
                self._create_git_cmdline(committed_cmd)
111
            )
UNCOV
112
            files.update(self._parse_name_status(committed_changes))
×
113

UNCOV
114
        if include_untracked:
×
UNCOV
115
            untracked_cmd = [
×
116
                "ls-files",
117
                "--other",
118
                "--exclude-standard",
119
                "--full-name",
120
            ] + rel_suffix
UNCOV
121
            untracked = self._git_binary._invoke_unsandboxed(
×
122
                self._create_git_cmdline(untracked_cmd)
123
            )
UNCOV
124
            files.update(
×
125
                {path: ChangeType.ADDED for path in untracked.decode().splitlines() if path}
126
            )
127

128
        # git will report changed files relative to the worktree: re-relativize to relative_to
UNCOV
129
        return {
×
130
            ChangedFile(
131
                path=self._fix_git_relative_path(path, relative_to),
132
                change_type=change_type,
133
            )
134
            for path, change_type in files.items()
135
        }
136

137
    @staticmethod
5✔
138
    def _parse_name_status(output: bytes) -> dict[str, ChangeType]:
5✔
139
        """Parse `git diff --name-status` output into a dict of path -> ChangeType."""
UNCOV
140
        result = {}
×
UNCOV
141
        for line in output.decode().splitlines():
×
UNCOV
142
            if not line:
×
143
                continue
×
UNCOV
144
            status, _, path = line.partition("\t")
×
UNCOV
145
            letter = status[0].upper() if status else "M"
×
UNCOV
146
            try:
×
UNCOV
147
                change_type = ChangeType(letter)
×
148
            except ValueError:
×
149
                # Git may report various other esoteric statuses.
150
                # We treat anything that isn't A, D or M as if it were M.
151
                change_type = ChangeType.MODIFIED
×
UNCOV
152
            result[path] = change_type
×
UNCOV
153
        return result
×
154

155
    def changed_files_lines(
5✔
156
        self,
157
        paths: Iterable[str],
158
        /,
159
        *,
160
        from_commit: str | None = None,
161
        relative_to: PurePath | str | None = None,
162
        include_untracked: bool = False,
163
    ) -> dict[str, tuple[Hunk, ...]]:
UNCOV
164
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
165

UNCOV
166
        result = self._git_diff(
×
167
            "--unified=0",
168
            "HEAD",
169
            "--",
170
            *[str(relative_to / path) for path in paths],
171
        )
172

UNCOV
173
        if from_commit:
×
UNCOV
174
            diff = self._git_diff(
×
175
                "--unified=0",
176
                from_commit + "...HEAD",
177
                "--",
178
                *[str(relative_to / path) for path in paths],
179
            )
UNCOV
180
            result.update(diff)
×
181

UNCOV
182
        if include_untracked:
×
183
            # There is no git diff flag to include untracked files, so we get
184
            # the list of untracked files and manually create the diff by
185
            # comparing each file to an empty /dev/null.
UNCOV
186
            untracked_files = (
×
187
                self._git(
188
                    "ls-files",
189
                    "--other",
190
                    "--exclude-standard",
191
                    "--full-name",
192
                )
193
                .decode()
194
                .splitlines()
195
            )
UNCOV
196
            for file in set(untracked_files).intersection(paths):
×
UNCOV
197
                untracked_diff = self._git_diff("--no-index", "/dev/null", str(relative_to / file))
×
UNCOV
198
                assert len(untracked_diff) == 1
×
UNCOV
199
                result[file] = next(iter(untracked_diff.values()))
×
200

UNCOV
201
        return result
×
202

203
    def _git(self, *args: str) -> bytes:
5✔
204
        """Run unsandboxed git command."""
UNCOV
205
        return self._git_binary._invoke_unsandboxed(self._create_git_cmdline(args))
×
206

207
    def _git_diff(self, *args: str) -> dict[str, tuple[Hunk, ...]]:
5✔
208
        """Run unsandboxed git diff command and parse the diff."""
UNCOV
209
        return self._diff_parser.parse_unified_diff(self._git("diff", *args))
×
210

211
    def changes_in(
5✔
212
        self, diffspec: str, relative_to: PurePath | str | None = None
213
    ) -> set[ChangedFile]:
UNCOV
214
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
UNCOV
215
        cmd = ["diff-tree", "--no-commit-id", "--name-status", "--no-renames", "-r", diffspec]
×
UNCOV
216
        output = self._git_binary._invoke_unsandboxed(self._create_git_cmdline(cmd))
×
UNCOV
217
        return {
×
218
            ChangedFile(
219
                path=self._fix_git_relative_path(path, relative_to),
220
                change_type=change_type,
221
            )
222
            for path, change_type in self._parse_name_status(output).items()
223
        }
224

225
    def _create_git_cmdline(self, args: Iterable[str]) -> list[str]:
5✔
UNCOV
226
        return [f"--git-dir={self._gitdir}", f"--work-tree={self.worktree}", *args]
×
227

228
    def __eq__(self, other: Any) -> bool:
5✔
229
        # NB: See the class doc regarding equality.
UNCOV
230
        return id(self) == id(other)
×
231

232

233
class ParseError(Exception):
5✔
234
    pass
5✔
235

236

237
class DiffParser:
5✔
238
    def parse_unified_diff(self, content: bytes) -> dict[str, tuple[Hunk, ...]]:
5✔
UNCOV
239
        buf = BytesIO(content)
×
UNCOV
240
        current_file = None
×
UNCOV
241
        hunks: DefaultDict[str, list[Hunk]] = defaultdict(list)
×
UNCOV
242
        for line in buf:
×
UNCOV
243
            line = line.strip()
×
244

UNCOV
245
            if match := self._filename_regex.match(line):
×
UNCOV
246
                if current_file is not None:
×
247
                    # mypy false positive: https://github.com/python/mypy/issues/14987
UNCOV
248
                    hunks.setdefault(
×
249
                        current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))]
250
                    )
UNCOV
251
                current_file = self._parse_filename(match)
×
UNCOV
252
                if current_file is None:
×
253
                    raise ValueError(f"failed to parse filename from line: `{line!r}`")
×
UNCOV
254
                continue
×
255

UNCOV
256
            if match := self._lines_changed_regex.match(line):
×
UNCOV
257
                if current_file is None:
×
258
                    raise ParseError(f"missing filename in the diff:\n{content!r}")
×
259

UNCOV
260
                try:
×
UNCOV
261
                    hunk = self._parse_hunk(match, line)
×
262
                except ValueError as e:
×
263
                    raise ValueError(f"Failed to parse hunk: {line!r}") from e
×
264

UNCOV
265
                hunks[current_file].append(hunk)
×
UNCOV
266
                continue
×
267

UNCOV
268
        if current_file is not None:
×
UNCOV
269
            hunks.setdefault(current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))])
×
UNCOV
270
        return {filename: tuple(file_hunks) for filename, file_hunks in hunks.items()}
×
271

272
    @cached_property
5✔
273
    def _lines_changed_regex(self) -> re.Pattern:
5✔
UNCOV
274
        return re.compile(rb"^@@ -([0-9]+)(,([0-9]+))? \+([0-9]+)(,([0-9]+))? @@.*")
×
275

276
    def _parse_hunk(self, match: re.Match, line: bytes) -> Hunk:
5✔
UNCOV
277
        g = match.groups()
×
UNCOV
278
        return Hunk(
×
279
            left=TextBlock(
280
                start=int(g[0]),
281
                count=int(g[2]) if g[2] is not None else 1,
282
            ),
283
            right=TextBlock(
284
                start=int(g[3]),
285
                count=int(g[5]) if g[5] is not None else 1,
286
            ),
287
        )
288

289
    @cached_property
5✔
290
    def _filename_regex(self) -> re.Pattern:
5✔
291
        # This only handles whitespaces. It doesn't work if a filename has something weird
292
        # in it that needs escaping, e.g. a double quote.
UNCOV
293
        a_file = rb'(?:a/(?:[^"]+)|"a/(:?(?:[^"]|\\")+)")'
×
UNCOV
294
        b_file = rb'(?:b/(?P<unquoted>[^"]+)|"b/(?P<quoted>(?:[^"]|\\")+)")'
×
UNCOV
295
        return re.compile(b"^diff --git " + a_file + b" " + b_file + b"$")
×
296

297
    def _parse_filename(self, match: re.Match) -> str | None:
5✔
UNCOV
298
        unquoted = g.decode() if (g := match.group("unquoted")) is not None else None
×
UNCOV
299
        quoted = (
×
300
            g.decode().replace(r"\"", '"') if (g := match.group("quoted")) is not None else None
301
        )
UNCOV
302
        return unquoted or quoted
×
303

304

305
@dataclass(frozen=True)
5✔
306
class MaybeGitWorktree(EngineAwareReturnType):
5✔
307
    git_worktree: GitWorktree | None = None
5✔
308
    failure_reason: str | None = None  # If git_worktree is None, the reason why.
5✔
309

310
    def cacheable(self) -> bool:
5✔
UNCOV
311
        return False
×
312

313

314
@dataclasses.dataclass(frozen=True)
5✔
315
class GitWorktreeRequest:
5✔
316
    gitdir: PathLike[str] | None = None
5✔
317
    subdir: PathLike[str] | None = None
5✔
318

319

320
@rule
5✔
321
async def get_git_worktree(
5✔
322
    git_worktree_request: GitWorktreeRequest,
323
    maybe_git_binary: MaybeGitBinary,
324
) -> MaybeGitWorktree:
UNCOV
325
    if not maybe_git_binary.git_binary:
×
326
        return MaybeGitWorktree(failure_reason="couldn't find `git` binary")
×
327

UNCOV
328
    git_binary = maybe_git_binary.git_binary
×
UNCOV
329
    cmd = ["rev-parse", "--show-toplevel"]
×
330

UNCOV
331
    try:
×
UNCOV
332
        if git_worktree_request.subdir:
×
UNCOV
333
            with pushd(str(git_worktree_request.subdir)):
×
UNCOV
334
                output = git_binary._invoke_unsandboxed(cmd)
×
335
        else:
UNCOV
336
            output = git_binary._invoke_unsandboxed(cmd)
×
UNCOV
337
    except GitBinaryException as e:
×
UNCOV
338
        failure_msg = f"no git repository at {os.getcwd()}: {e!r}"
×
UNCOV
339
        logger.info(failure_msg)
×
UNCOV
340
        return MaybeGitWorktree(failure_reason=failure_msg)
×
341

UNCOV
342
    git_worktree = GitWorktree(
×
343
        binary=git_binary,
344
        gitdir=git_worktree_request.gitdir,
345
        worktree=PurePath(output.decode()),
346
    )
347

UNCOV
348
    logger.debug(
×
349
        f"Detected git repository at {git_worktree.worktree} on branch {git_worktree.branch_name}"
350
    )
UNCOV
351
    return MaybeGitWorktree(git_worktree=git_worktree)
×
352

353

354
def rules():
5✔
355
    return [*collect_rules()]
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc