• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25441711719

06 May 2026 02:31PM UTC coverage: 92.915%. Remained the same
25441711719

push

github

web-flow
use sha pin (with comment) format for generated actions (#23312)

Per the GitHub Action best practices we recently enabled at #23249, we
should pin each action to a SHA so that the reference is actually
immutable.

This will -- I hope -- knock out a large chunk of the 421 alerts we
currently get from zizmor. The next followup would then be upgrades and
harmonizing the generated and none-generated pins.

Notice: This idea was suggested by Claude while going over pinact output
and I was surprised to see that post processing the yaml wasn't too
gross.

92206 of 99237 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.57
/src/python/pants/vcs/git.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import dataclasses
12✔
7
import logging
12✔
8
import os
12✔
9
import re
12✔
10
from collections import defaultdict
12✔
11
from collections.abc import Iterable
12✔
12
from dataclasses import dataclass
12✔
13
from functools import cached_property
12✔
14
from io import BytesIO
12✔
15
from os import PathLike
12✔
16
from pathlib import Path, PurePath
12✔
17
from typing import Any, DefaultDict
12✔
18

19
from pants.core.util_rules.system_binaries import GitBinary, GitBinaryException, MaybeGitBinary
12✔
20
from pants.engine.engine_aware import EngineAwareReturnType
12✔
21
from pants.engine.rules import collect_rules, rule
12✔
22
from pants.util.contextutil import pushd
12✔
23
from pants.vcs.change import ChangedFile, ChangeType
12✔
24
from pants.vcs.hunk import Hunk, TextBlock
12✔
25

26
logger = logging.getLogger(__name__)
12✔
27

28

29
class GitWorktree(EngineAwareReturnType):
12✔
30
    """Implements a safe wrapper for un-sandboxed access to Git in the user's working copy.
31

32
    This type (and any wrappers) should be marked `EngineAwareReturnType.cacheable=False`, because
33
    it internally uses un-sandboxed APIs, and `@rules` which produce it should re-run in each
34
    session. It additionally implements a default `__eq__` in order to prevent early-cutoff in the
35
    graph, and force any consumers of the type to re-run.
36
    """
37

38
    worktree: PurePath
12✔
39
    _gitdir: PurePath
12✔
40
    _git_binary: GitBinary
12✔
41

42
    def __init__(
12✔
43
        self,
44
        binary: GitBinary,
45
        worktree: PathLike[str] | None = None,
46
        gitdir: PathLike[str] | None = None,
47
    ) -> None:
48
        """Creates a git object that assumes the git repository is in the cwd by default.
49

50
        binary:    The git binary to use.
51
        worktree:  The path to the git repository working tree directory (typically '.').
52
        gitdir:    The path to the repository's git metadata directory (typically '.git').
53
        """
54
        self.worktree = Path(worktree or os.getcwd()).resolve()
2✔
55
        self._gitdir = Path(gitdir).resolve() if gitdir else (self.worktree / ".git")
2✔
56
        self._git_binary = binary
2✔
57
        self._diff_parser = DiffParser()
2✔
58

59
    def cacheable(self) -> bool:
12✔
60
        return False
×
61

62
    @property
12✔
63
    def current_rev_identifier(self):
12✔
64
        return "HEAD"
×
65

66
    @property
12✔
67
    def commit_id(self):
12✔
68
        return self._git_binary._invoke_unsandboxed(
×
69
            self._create_git_cmdline(["rev-parse", "HEAD"])
70
        ).decode()
71

72
    @property
12✔
73
    def branch_name(self) -> str | None:
12✔
74
        branch = self._git_binary._invoke_unsandboxed(
2✔
75
            self._create_git_cmdline(["rev-parse", "--abbrev-ref", "HEAD"])
76
        ).decode()
77
        return None if branch == "HEAD" else branch
2✔
78

79
    def _fix_git_relative_path(self, worktree_path: str, relative_to: PurePath | str) -> str:
12✔
80
        return str((self.worktree / worktree_path).relative_to(relative_to))
1✔
81

82
    def changed_files(
12✔
83
        self,
84
        from_commit: str | None = None,
85
        include_untracked: bool = False,
86
        relative_to: PurePath | str | None = None,
87
    ) -> set[ChangedFile]:
88
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
1✔
89
        rel_suffix = ["--", str(relative_to)]
1✔
90

91
        files: dict[str, ChangeType] = {}
1✔
92

93
        uncommitted_changes = self._git_binary._invoke_unsandboxed(
1✔
94
            self._create_git_cmdline(
95
                ["diff", "--name-status", "--no-renames", "HEAD"] + rel_suffix,
96
            )
97
        )
98
        files.update(self._parse_name_status(uncommitted_changes))
1✔
99

100
        if from_commit:
1✔
101
            # Grab the diff from the merge-base to HEAD using ... syntax.  This ensures we have just
102
            # the changes that have occurred on the current branch.
103
            committed_cmd = [
1✔
104
                "diff",
105
                "--name-status",
106
                "--no-renames",
107
                from_commit + "...HEAD",
108
            ] + rel_suffix
109
            committed_changes = self._git_binary._invoke_unsandboxed(
1✔
110
                self._create_git_cmdline(committed_cmd)
111
            )
112
            files.update(self._parse_name_status(committed_changes))
1✔
113

114
        if include_untracked:
1✔
115
            untracked_cmd = [
1✔
116
                "ls-files",
117
                "--other",
118
                "--exclude-standard",
119
                "--full-name",
120
            ] + rel_suffix
121
            untracked = self._git_binary._invoke_unsandboxed(
1✔
122
                self._create_git_cmdline(untracked_cmd)
123
            )
124
            files.update(
1✔
125
                {path: ChangeType.ADDED for path in untracked.decode().splitlines() if path}
126
            )
127

128
        # git will report changed files relative to the worktree: re-relativize to relative_to
129
        return {
1✔
130
            ChangedFile(
131
                path=self._fix_git_relative_path(path, relative_to),
132
                change_type=change_type,
133
            )
134
            for path, change_type in files.items()
135
        }
136

137
    @staticmethod
12✔
138
    def _parse_name_status(output: bytes) -> dict[str, ChangeType]:
12✔
139
        """Parse `git diff --name-status` output into a dict of path -> ChangeType."""
140
        result = {}
1✔
141
        for line in output.decode().splitlines():
1✔
142
            if not line:
1✔
143
                continue
×
144
            status, _, path = line.partition("\t")
1✔
145
            letter = status[0].upper() if status else "M"
1✔
146
            try:
1✔
147
                change_type = ChangeType(letter)
1✔
148
            except ValueError:
×
149
                # Git may report various other esoteric statuses.
150
                # We treat anything that isn't A, D or M as if it were M.
151
                change_type = ChangeType.MODIFIED
×
152
            result[path] = change_type
1✔
153
        return result
1✔
154

155
    def changed_files_lines(
12✔
156
        self,
157
        paths: Iterable[str],
158
        /,
159
        *,
160
        from_commit: str | None = None,
161
        relative_to: PurePath | str | None = None,
162
        include_untracked: bool = False,
163
    ) -> dict[str, tuple[Hunk, ...]]:
164
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
1✔
165

166
        result = self._git_diff(
1✔
167
            "--unified=0",
168
            "HEAD",
169
            "--",
170
            *[str(relative_to / path) for path in paths],
171
        )
172

173
        if from_commit:
1✔
174
            diff = self._git_diff(
1✔
175
                "--unified=0",
176
                from_commit + "...HEAD",
177
                "--",
178
                *[str(relative_to / path) for path in paths],
179
            )
180
            result.update(diff)
1✔
181

182
        if include_untracked:
1✔
183
            # There is no git diff flag to include untracked files, so we get
184
            # the list of untracked files and manually create the diff by
185
            # comparing each file to an empty /dev/null.
186
            untracked_files = (
1✔
187
                self._git(
188
                    "ls-files",
189
                    "--other",
190
                    "--exclude-standard",
191
                    "--full-name",
192
                )
193
                .decode()
194
                .splitlines()
195
            )
196
            for file in set(untracked_files).intersection(paths):
1✔
197
                untracked_diff = self._git_diff("--no-index", "/dev/null", str(relative_to / file))
1✔
198
                assert len(untracked_diff) == 1
1✔
199
                result[file] = next(iter(untracked_diff.values()))
1✔
200

201
        return result
1✔
202

203
    def _git(self, *args: str) -> bytes:
12✔
204
        """Run unsandboxed git command."""
205
        return self._git_binary._invoke_unsandboxed(self._create_git_cmdline(args))
1✔
206

207
    def _git_diff(self, *args: str) -> dict[str, tuple[Hunk, ...]]:
12✔
208
        """Run unsandboxed git diff command and parse the diff."""
209
        return self._diff_parser.parse_unified_diff(self._git("diff", *args))
1✔
210

211
    def changes_in(
12✔
212
        self, diffspec: str, relative_to: PurePath | str | None = None
213
    ) -> set[ChangedFile]:
214
        relative_to = PurePath(relative_to) if relative_to is not None else self.worktree
1✔
215
        cmd = ["diff-tree", "--no-commit-id", "--name-status", "--no-renames", "-r", diffspec]
1✔
216
        output = self._git_binary._invoke_unsandboxed(self._create_git_cmdline(cmd))
1✔
217
        return {
1✔
218
            ChangedFile(
219
                path=self._fix_git_relative_path(path, relative_to),
220
                change_type=change_type,
221
            )
222
            for path, change_type in self._parse_name_status(output).items()
223
        }
224

225
    def _create_git_cmdline(self, args: Iterable[str]) -> list[str]:
12✔
226
        return [f"--git-dir={self._gitdir}", f"--work-tree={self.worktree}", *args]
2✔
227

228
    def __eq__(self, other: Any) -> bool:
12✔
229
        # NB: See the class doc regarding equality.
230
        return id(self) == id(other)
1✔
231

232

233
class ParseError(Exception):
12✔
234
    pass
12✔
235

236

237
class DiffParser:
12✔
238
    def parse_unified_diff(self, content: bytes) -> dict[str, tuple[Hunk, ...]]:
12✔
239
        buf = BytesIO(content)
1✔
240
        current_file = None
1✔
241
        hunks: DefaultDict[str, list[Hunk]] = defaultdict(list)
1✔
242
        for line in buf:
1✔
243
            line = line.strip()
1✔
244

245
            if match := self._filename_regex.match(line):
1✔
246
                if current_file is not None:
1✔
247
                    # mypy false positive: https://github.com/python/mypy/issues/14987
248
                    hunks.setdefault(
1✔
249
                        current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))]
250
                    )
251
                current_file = self._parse_filename(match)
1✔
252
                if current_file is None:
1✔
253
                    raise ValueError(f"failed to parse filename from line: `{line!r}`")
×
254
                continue
1✔
255

256
            if match := self._lines_changed_regex.match(line):
1✔
257
                if current_file is None:
1✔
258
                    raise ParseError(f"missing filename in the diff:\n{content!r}")
×
259

260
                try:
1✔
261
                    hunk = self._parse_hunk(match, line)
1✔
262
                except ValueError as e:
×
263
                    raise ValueError(f"Failed to parse hunk: {line!r}") from e
×
264

265
                hunks[current_file].append(hunk)
1✔
266
                continue
1✔
267

268
        if current_file is not None:
1✔
269
            hunks.setdefault(current_file, [Hunk(left=None, right=TextBlock(start=0, count=0))])
1✔
270
        return {filename: tuple(file_hunks) for filename, file_hunks in hunks.items()}
1✔
271

272
    @cached_property
12✔
273
    def _lines_changed_regex(self) -> re.Pattern:
12✔
274
        return re.compile(rb"^@@ -([0-9]+)(,([0-9]+))? \+([0-9]+)(,([0-9]+))? @@.*")
1✔
275

276
    def _parse_hunk(self, match: re.Match, line: bytes) -> Hunk:
12✔
277
        g = match.groups()
1✔
278
        return Hunk(
1✔
279
            left=TextBlock(
280
                start=int(g[0]),
281
                count=int(g[2]) if g[2] is not None else 1,
282
            ),
283
            right=TextBlock(
284
                start=int(g[3]),
285
                count=int(g[5]) if g[5] is not None else 1,
286
            ),
287
        )
288

289
    @cached_property
12✔
290
    def _filename_regex(self) -> re.Pattern:
12✔
291
        # This only handles whitespaces. It doesn't work if a filename has something weird
292
        # in it that needs escaping, e.g. a double quote.
293
        a_file = rb'(?:a/(?:[^"]+)|"a/(:?(?:[^"]|\\")+)")'
1✔
294
        b_file = rb'(?:b/(?P<unquoted>[^"]+)|"b/(?P<quoted>(?:[^"]|\\")+)")'
1✔
295
        return re.compile(b"^diff --git " + a_file + b" " + b_file + b"$")
1✔
296

297
    def _parse_filename(self, match: re.Match) -> str | None:
12✔
298
        unquoted = g.decode() if (g := match.group("unquoted")) is not None else None
1✔
299
        quoted = (
1✔
300
            g.decode().replace(r"\"", '"') if (g := match.group("quoted")) is not None else None
301
        )
302
        return unquoted or quoted
1✔
303

304

305
@dataclass(frozen=True)
12✔
306
class MaybeGitWorktree(EngineAwareReturnType):
12✔
307
    git_worktree: GitWorktree | None = None
12✔
308
    failure_reason: str | None = None  # If git_worktree is None, the reason why.
12✔
309

310
    def cacheable(self) -> bool:
12✔
311
        return False
2✔
312

313

314
@dataclasses.dataclass(frozen=True)
12✔
315
class GitWorktreeRequest:
12✔
316
    gitdir: PathLike[str] | None = None
12✔
317
    subdir: PathLike[str] | None = None
12✔
318

319

320
@rule
12✔
321
async def get_git_worktree(
12✔
322
    git_worktree_request: GitWorktreeRequest,
323
    maybe_git_binary: MaybeGitBinary,
324
) -> MaybeGitWorktree:
325
    if not maybe_git_binary.git_binary:
2✔
326
        return MaybeGitWorktree(failure_reason="couldn't find `git` binary")
×
327

328
    git_binary = maybe_git_binary.git_binary
2✔
329
    cmd = ["rev-parse", "--show-toplevel"]
2✔
330

331
    try:
2✔
332
        if git_worktree_request.subdir:
2✔
333
            with pushd(str(git_worktree_request.subdir)):
1✔
334
                output = git_binary._invoke_unsandboxed(cmd)
1✔
335
        else:
336
            output = git_binary._invoke_unsandboxed(cmd)
2✔
337
    except GitBinaryException as e:
1✔
338
        failure_msg = f"no git repository at {os.getcwd()}: {e!r}"
1✔
339
        logger.info(failure_msg)
1✔
340
        return MaybeGitWorktree(failure_reason=failure_msg)
1✔
341

342
    git_worktree = GitWorktree(
2✔
343
        binary=git_binary,
344
        gitdir=git_worktree_request.gitdir,
345
        worktree=PurePath(output.decode()),
346
    )
347

348
    logger.debug(
2✔
349
        f"Detected git repository at {git_worktree.worktree} on branch {git_worktree.branch_name}"
350
    )
351
    return MaybeGitWorktree(git_worktree=git_worktree)
2✔
352

353

354
def rules():
12✔
355
    return [*collect_rules()]
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc