• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.35
/src/python/pants/vcs/git.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import dataclasses
1✔
7
import logging
1✔
8
import os
1✔
9
import re
1✔
10
from collections import defaultdict
1✔
11
from collections.abc import Iterable
1✔
12
from dataclasses import dataclass
1✔
13
from functools import cached_property
1✔
14
from io import BytesIO
1✔
15
from os import PathLike
1✔
16
from pathlib import Path, PurePath
1✔
17
from typing import Any, DefaultDict
1✔
18

19
from pants.core.util_rules.system_binaries import GitBinary, GitBinaryException, MaybeGitBinary
1✔
20
from pants.engine.engine_aware import EngineAwareReturnType
1✔
21
from pants.engine.rules import collect_rules, rule
1✔
22
from pants.util.contextutil import pushd
1✔
23
from pants.vcs.hunk import Hunk, TextBlock
1✔
24

25
logger = logging.getLogger(__name__)
1✔
26

27

28
class GitWorktree(EngineAwareReturnType):
1✔
29
    """Implements a safe wrapper for un-sandboxed access to Git in the user's working copy.
30

31
    This type (and any wrappers) should be marked `EngineAwareReturnType.cacheable=False`, because
32
    it internally uses un-sandboxed APIs, and `@rules` which produce it should re-run in each
33
    session. It additionally implements a default `__eq__` in order to prevent early-cutoff in the
34
    graph, and force any consumers of the type to re-run.
35
    """
36

37
    worktree: PurePath
1✔
38
    _gitdir: PurePath
1✔
39
    _git_binary: GitBinary
1✔
40

41
    def __init__(
1✔
42
        self,
43
        binary: GitBinary,
44
        worktree: PathLike[str] | None = None,
45
        gitdir: PathLike[str] | None = None,
46
    ) -> None:
47
        """Creates a git object that assumes the git repository is in the cwd by default.
48

49
        binary:    The git binary to use.
50
        worktree:  The path to the git repository working tree directory (typically '.').
51
        gitdir:    The path to the repository's git metadata directory (typically '.git').
52
        """
UNCOV
53
        self.worktree = Path(worktree or os.getcwd()).resolve()
×
UNCOV
54
        self._gitdir = Path(gitdir).resolve() if gitdir else (self.worktree / ".git")
×
UNCOV
55
        self._git_binary = binary
×
UNCOV
56
        self._diff_parser = DiffParser()
×
57

58
    def cacheable(self) -> bool:
1✔
59
        return False
×
60

61
    @property
1✔
62
    def current_rev_identifier(self):
1✔
63
        return "HEAD"
×
64

65
    @property
1✔
66
    def commit_id(self):
1✔
67
        return self._git_binary._invoke_unsandboxed(
×
68
            self._create_git_cmdline(["rev-parse", "HEAD"])
69
        ).decode()
70

71
    @property
1✔
72
    def branch_name(self) -> str | None:
1✔
UNCOV
73
        branch = self._git_binary._invoke_unsandboxed(
×
74
            self._create_git_cmdline(["rev-parse", "--abbrev-ref", "HEAD"])
75
        ).decode()
UNCOV
76
        return None if branch == "HEAD" else branch
×
77

78
    def _fix_git_relative_path(self, worktree_path: str, relative_to: PurePath | str) -> str:
1✔
UNCOV
79
        return str((self.worktree / worktree_path).relative_to(relative_to))
×
80

81
    def changed_files(
1✔
82
        self,
83
        from_commit: str | None = None,
84
        include_untracked: bool = False,
85
        relative_to: PurePath | str | None = None,
86
    ) -> set[str]:
UNCOV
87
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
UNCOV
88
        rel_suffix = ["--", str(relative_to)]
×
UNCOV
89
        uncommitted_changes = self._git_binary._invoke_unsandboxed(
×
90
            self._create_git_cmdline(
91
                ["diff", "--name-only", "HEAD"] + rel_suffix,
92
            )
93
        )
94

UNCOV
95
        files = set(uncommitted_changes.decode().splitlines())
×
UNCOV
96
        if from_commit:
×
97
            # Grab the diff from the merge-base to HEAD using ... syntax.  This ensures we have just
98
            # the changes that have occurred on the current branch.
UNCOV
99
            committed_cmd = [
×
100
                "diff",
101
                "--name-only",
102
                from_commit + "...HEAD",
103
            ] + rel_suffix
UNCOV
104
            committed_changes = self._git_binary._invoke_unsandboxed(
×
105
                self._create_git_cmdline(committed_cmd)
106
            )
UNCOV
107
            files.update(committed_changes.decode().splitlines())
×
UNCOV
108
        if include_untracked:
×
UNCOV
109
            untracked_cmd = [
×
110
                "ls-files",
111
                "--other",
112
                "--exclude-standard",
113
                "--full-name",
114
            ] + rel_suffix
UNCOV
115
            untracked = self._git_binary._invoke_unsandboxed(
×
116
                self._create_git_cmdline(untracked_cmd)
117
            )
UNCOV
118
            files.update(untracked.decode().splitlines())
×
119
        # git will report changed files relative to the worktree: re-relativize to relative_to
UNCOV
120
        return {self._fix_git_relative_path(f, relative_to) for f in files}
×
121

122
    def changed_files_lines(
1✔
123
        self,
124
        paths: Iterable[str],
125
        /,
126
        *,
127
        from_commit: str | None = None,
128
        relative_to: PurePath | str | None = None,
129
        include_untracked: bool = False,
130
    ) -> dict[str, tuple[Hunk, ...]]:
UNCOV
131
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
132

UNCOV
133
        result = self._git_diff(
×
134
            "--unified=0",
135
            "HEAD",
136
            "--",
137
            *[str(relative_to / path) for path in paths],
138
        )
139

UNCOV
140
        if from_commit:
×
UNCOV
141
            diff = self._git_diff(
×
142
                "--unified=0",
143
                from_commit + "...HEAD",
144
                "--",
145
                *[str(relative_to / path) for path in paths],
146
            )
UNCOV
147
            result.update(diff)
×
148

UNCOV
149
        if include_untracked:
×
150
            # There is no git diff flag to include untracked files, so we get
151
            # the list of untracked files and manually create the diff by
152
            # comparing each file to an empty /dev/null.
UNCOV
153
            untracked_files = (
×
154
                self._git(
155
                    "ls-files",
156
                    "--other",
157
                    "--exclude-standard",
158
                    "--full-name",
159
                )
160
                .decode()
161
                .splitlines()
162
            )
UNCOV
163
            for file in set(untracked_files).intersection(paths):
×
UNCOV
164
                untracked_diff = self._git_diff("--no-index", "/dev/null", str(relative_to / file))
×
UNCOV
165
                assert len(untracked_diff) == 1
×
UNCOV
166
                result[file] = next(iter(untracked_diff.values()))
×
167

UNCOV
168
        return result
×
169

170
    def _git(self, *args: str) -> bytes:
1✔
171
        """Run unsandboxed git command."""
UNCOV
172
        return self._git_binary._invoke_unsandboxed(self._create_git_cmdline(args))
×
173

174
    def _git_diff(self, *args: str) -> dict[str, tuple[Hunk, ...]]:
1✔
175
        """Run unsandboxed git diff command and parse the diff."""
UNCOV
176
        return self._diff_parser.parse_unified_diff(self._git("diff", *args))
×
177

178
    def changes_in(self, diffspec: str, relative_to: PurePath | str | None = None) -> set[str]:
1✔
UNCOV
179
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
×
UNCOV
180
        cmd = ["diff-tree", "--no-commit-id", "--name-only", "-r", diffspec]
×
UNCOV
181
        files = (
×
182
            self._git_binary._invoke_unsandboxed(self._create_git_cmdline(cmd))
183
            .decode()
184
            .splitlines()
185
        )
UNCOV
186
        return {self._fix_git_relative_path(f.strip(), relative_to) for f in files}
×
187

188
    def _create_git_cmdline(self, args: Iterable[str]) -> list[str]:
1✔
UNCOV
189
        return [f"--git-dir={self._gitdir}", f"--work-tree={self.worktree}", *args]
×
190

191
    def __eq__(self, other: Any) -> bool:
1✔
192
        # NB: See the class doc regarding equality.
193
        return id(self) == id(other)
×
194

195

196
class ParseError(Exception):
1✔
197
    pass
1✔
198

199

200
class DiffParser:
1✔
201
    def parse_unified_diff(self, content: bytes) -> dict[str, tuple[Hunk, ...]]:
1✔
UNCOV
202
        buf = BytesIO(content)
×
UNCOV
203
        current_file = None
×
UNCOV
204
        hunks: DefaultDict[str, list[Hunk]] = defaultdict(list)
×
UNCOV
205
        for line in buf:
×
UNCOV
206
            line = line.strip()
×
207

UNCOV
208
            if match := self._filename_regex.match(line):
×
UNCOV
209
                if current_file is not None:
×
210
                    # mypy false positive: https://github.com/python/mypy/issues/14987
UNCOV
211
                    hunks.setdefault(
×
212
                        current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))]
213
                    )
UNCOV
214
                current_file = self._parse_filename(match)
×
UNCOV
215
                if current_file is None:
×
216
                    raise ValueError(f"failed to parse filename from line: `{line!r}`")
×
UNCOV
217
                continue
×
218

UNCOV
219
            if match := self._lines_changed_regex.match(line):
×
UNCOV
220
                if current_file is None:
×
221
                    raise ParseError(f"missing filename in the diff:\n{content!r}")
×
222

UNCOV
223
                try:
×
UNCOV
224
                    hunk = self._parse_hunk(match, line)
×
225
                except ValueError as e:
×
226
                    raise ValueError(f"Failed to parse hunk: {line!r}") from e
×
227

UNCOV
228
                hunks[current_file].append(hunk)
×
UNCOV
229
                continue
×
230

UNCOV
231
        if current_file is not None:
×
UNCOV
232
            hunks.setdefault(current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))])
×
UNCOV
233
        return {filename: tuple(file_hunks) for filename, file_hunks in hunks.items()}
×
234

235
    @cached_property
1✔
236
    def _lines_changed_regex(self) -> re.Pattern:
1✔
UNCOV
237
        return re.compile(rb"^@@ -([0-9]+)(,([0-9]+))? \+([0-9]+)(,([0-9]+))? @@.*")
×
238

239
    def _parse_hunk(self, match: re.Match, line: bytes) -> Hunk:
1✔
UNCOV
240
        g = match.groups()
×
UNCOV
241
        return Hunk(
×
242
            left=TextBlock(
243
                start=int(g[0]),
244
                count=int(g[2]) if g[2] is not None else 1,
245
            ),
246
            right=TextBlock(
247
                start=int(g[3]),
248
                count=int(g[5]) if g[5] is not None else 1,
249
            ),
250
        )
251

252
    @cached_property
1✔
253
    def _filename_regex(self) -> re.Pattern:
1✔
254
        # This only handles whitespaces. It doesn't work if a filename has something weird
255
        # in it that needs escaping, e.g. a double quote.
UNCOV
256
        a_file = rb'(?:a/(?:[^"]+)|"a/(:?(?:[^"]|\\")+)")'
×
UNCOV
257
        b_file = rb'(?:b/(?P<unquoted>[^"]+)|"b/(?P<quoted>(?:[^"]|\\")+)")'
×
UNCOV
258
        return re.compile(b"^diff --git " + a_file + b" " + b_file + b"$")
×
259

260
    def _parse_filename(self, match: re.Match) -> str | None:
1✔
UNCOV
261
        unquoted = g.decode() if (g := match.group("unquoted")) is not None else None
×
UNCOV
262
        quoted = (
×
263
            g.decode().replace(r"\"", '"') if (g := match.group("quoted")) is not None else None
264
        )
UNCOV
265
        return unquoted or quoted
×
266

267

268
@dataclass(frozen=True)
1✔
269
class MaybeGitWorktree(EngineAwareReturnType):
1✔
270
    git_worktree: GitWorktree | None = None
1✔
271
    failure_reason: str | None = None  # If git_worktree is None, the reason why.
1✔
272

273
    def cacheable(self) -> bool:
1✔
274
        return False
×
275

276

277
@dataclasses.dataclass(frozen=True)
1✔
278
class GitWorktreeRequest:
1✔
279
    gitdir: PathLike[str] | None = None
1✔
280
    subdir: PathLike[str] | None = None
1✔
281

282

283
@rule
1✔
284
async def get_git_worktree(
1✔
285
    git_worktree_request: GitWorktreeRequest,
286
    maybe_git_binary: MaybeGitBinary,
287
) -> MaybeGitWorktree:
UNCOV
288
    if not maybe_git_binary.git_binary:
×
289
        return MaybeGitWorktree(failure_reason="couldn't find `git` binary")
×
290

UNCOV
291
    git_binary = maybe_git_binary.git_binary
×
UNCOV
292
    cmd = ["rev-parse", "--show-toplevel"]
×
293

UNCOV
294
    try:
×
UNCOV
295
        if git_worktree_request.subdir:
×
UNCOV
296
            with pushd(str(git_worktree_request.subdir)):
×
UNCOV
297
                output = git_binary._invoke_unsandboxed(cmd)
×
298
        else:
UNCOV
299
            output = git_binary._invoke_unsandboxed(cmd)
×
UNCOV
300
    except GitBinaryException as e:
×
UNCOV
301
        failure_msg = f"no git repository at {os.getcwd()}: {e!r}"
×
UNCOV
302
        logger.info(failure_msg)
×
UNCOV
303
        return MaybeGitWorktree(failure_reason=failure_msg)
×
304

UNCOV
305
    git_worktree = GitWorktree(
×
306
        binary=git_binary,
307
        gitdir=git_worktree_request.gitdir,
308
        worktree=PurePath(output.decode()),
309
    )
310

UNCOV
311
    logger.debug(
×
312
        f"Detected git repository at {git_worktree.worktree} on branch {git_worktree.branch_name}"
313
    )
UNCOV
314
    return MaybeGitWorktree(git_worktree=git_worktree)
×
315

316

317
def rules():
1✔
UNCOV
318
    return [*collect_rules()]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc