• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

repo-helper / southwark / 15056466000

15 May 2025 10:31PM UTC coverage: 89.7% (+3.4%) from 86.322%
15056466000

push

github

web-flow
Updated files with 'repo_helper'. (#66)

Co-authored-by: repo-helper[bot] <74742576+repo-helper[bot]@users.noreply.github.com>

479 of 534 relevant lines covered (89.7%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.2
/southwark/__init__.py
1
#!/usr/bin/env python3
2
#
3
#  __init__.py
4
"""
5
Extensions to the Dulwich Git library.
6
"""
7
#
8
#  Copyright © 2020 Dominic Davis-Foster <dominic@davis-foster.co.uk>
9
#
10
#  Permission is hereby granted, free of charge, to any person obtaining a copy
11
#  of this software and associated documentation files (the "Software"), to deal
12
#  in the Software without restriction, including without limitation the rights
13
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14
#  copies of the Software, and to permit persons to whom the Software is
15
#  furnished to do so, subject to the following conditions:
16
#
17
#  The above copyright notice and this permission notice shall be included in all
18
#  copies or substantial portions of the Software.
19
#
20
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
24
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
25
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
26
#  OR OTHER DEALINGS IN THE SOFTWARE.
27
#
28
#  format_commit, get_untracked_paths, get_tree_changes, clone and status
29
#  based on https://github.com/dulwich/dulwich
30
#  Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
31
#  |  Licensed under the Apache License, Version 2.0 (the "License"); you may
32
#  |  not use this file except in compliance with the License. You may obtain
33
#  |  a copy of the License at
34
#  |
35
#  |      http://www.apache.org/licenses/LICENSE-2.0
36
#  |
37
#  |  Unless required by applicable law or agreed to in writing, software
38
#  |  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
39
#  |  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
40
#  |  License for the specific language governing permissions and limitations
41
#  |  under the License.
42
#
43

44
# stdlib
45
import os
1✔
46
import shutil
1✔
47
from collections import defaultdict
1✔
48
from contextlib import closing, contextmanager
1✔
49
from itertools import chain
1✔
50
from operator import itemgetter
1✔
51
from typing import (
1✔
52
                IO,
53
                ContextManager,
54
                Dict,
55
                Iterator,
56
                List,
57
                NamedTuple,
58
                Optional,
59
                Sequence,
60
                Tuple,
61
                TypeVar,
62
                Union,
63
                overload
64
                )
65

66
# 3rd party
67
import dulwich.repo
1✔
68
from click import echo
1✔
69
from consolekit.terminal_colours import Fore
1✔
70
from domdf_python_tools.compat import nullcontext
1✔
71
from domdf_python_tools.paths import PathPlus, maybe_make, unwanted_dirs
1✔
72
from domdf_python_tools.typing import PathLike
1✔
73
from dulwich.config import StackedConfig
1✔
74
from dulwich.ignore import IgnoreFilterManager
1✔
75
from dulwich.index import Index, get_unstaged_changes
1✔
76
from dulwich.objects import Commit, ShaFile, Tag
1✔
77
from dulwich.porcelain import default_bytes_err_stream, fetch, path_to_tree_path
1✔
78
from typing_extensions import TypedDict
1✔
79

80
# this package
81
from southwark.repo import Repo
1✔
82

83
__author__: str = "Dominic Davis-Foster"
1✔
84
__copyright__: str = "2020 Dominic Davis-Foster"
1✔
85
__license__: str = "MIT License"
1✔
86
__version__: str = "0.9.0"
1✔
87
__email__: str = "dominic@davis-foster.co.uk"
1✔
88

89
__all__ = [
1✔
90
                "get_tags",
91
                "check_git_status",
92
                "assert_clean",
93
                "get_untracked_paths",
94
                "status",
95
                "StagedDict",
96
                "GitStatus",
97
                "get_tree_changes",
98
                "clone",
99
                "_DR",
100
                "open_repo_closing",
101
                "windows_clone_helper",
102
                ]
103

104
_DR = TypeVar("_DR", bound=dulwich.repo.Repo)
1✔
105

106

107
class StagedDict(TypedDict):
1✔
108
        """
109
        The values are lists of filenames, relative to the repository root.
110

111
        .. versionadded:: 0.6.1
112
        """
113

114
        add: List[PathPlus]
1✔
115
        delete: List[PathPlus]
1✔
116
        modify: List[PathPlus]
1✔
117

118

119
class GitStatus(NamedTuple):
1✔
120
        """
121
        Represents the output of :func:`~.status`.
122

123
        .. versionadded:: 0.6.1
124
        """
125

126
        #: Dict with lists of staged paths.
127
        staged: StagedDict
1✔
128

129
        #: List of unstaged paths.
130
        unstaged: List[PathPlus]
1✔
131

132
        #: List of untracked, un-ignored & non-.git paths.
133
        untracked: List[PathPlus]
1✔
134

135

136
def get_tags(repo: Union[dulwich.repo.Repo, PathLike] = '.') -> Dict[str, str]:
1✔
137
        """
138
        Returns a mapping of commit SHAs to tags.
139

140
        :param repo:
141
        """
142

143
        tags: Dict[str, str] = {}
1✔
144

145
        with open_repo_closing(repo) as r:
1✔
146
                raw_tags: Dict[bytes, bytes] = r.refs.as_dict(b"refs/tags")
1✔
147
                for tag, sha, in raw_tags.items():
1✔
148
                        obj = r.get_object(sha)
1✔
149
                        if isinstance(obj, Tag):
1✔
150
                                tags[obj.object[1].decode("UTF-8")] = tag.decode("UTF-8")
×
151
                        elif isinstance(obj, Commit):
1✔
152
                                tags[sha.decode("UTF-8")] = tag.decode("UTF-8")
1✔
153

154
        return tags
1✔
155

156

157
def assert_clean(repo: PathPlus, allow_config: Sequence[PathLike] = ()) -> bool:
1✔
158
        """
159
        Returns :py:obj:`True` if the working directory is clean.
160

161
        If not, returns :py:obj:`False` and prints a helpful error message to stderr.
162

163
        :param repo:
164
        :param allow_config:
165
        """
166

167
        allow_config = [PathPlus(filename) for filename in allow_config]
1✔
168

169
        stat = status(repo)
1✔
170

171
        modified_files = chain.from_iterable([
1✔
172
                        stat.staged["add"],
173
                        stat.staged["delete"],
174
                        stat.staged["modify"],
175
                        stat.unstaged,
176
                        ])
177

178
        if modified_files:
1✔
179
                for filename in modified_files:
1✔
180
                        if filename not in allow_config:
1✔
181
                                break
1✔
182
                else:
183
                        return True
1✔
184

185
        # If we get to here the directory isn't clean
186
        echo(Fore.RED("Git working directory is not clean:"), err=True)
1✔
187

188
        for line in format_git_status(stat):
1✔
189
                echo(Fore.RED(f"  {line}"), err=True)
1✔
190

191
        return False
1✔
192

193

194
status_codes: Dict[str, str] = {
1✔
195
                "add": 'A',
196
                "delete": 'D',
197
                "modify": 'M',
198
                }
199

200

201
def check_git_status(repo_path: PathLike) -> Tuple[bool, List[str]]:
1✔
202
        """
203
        Check the ``git`` status of the given repository.
204

205
        :param repo_path: Path to the repository root.
206

207
        :return: Whether the git working directory is clean, and the list of uncommitted files if it isn't.
208
        """
209

210
        str_lines = list(format_git_status(status(repo_path)))
1✔
211

212
        # with in_directory(repo_path):
213
        #
214
        #         lines = [
215
        #                         line.strip()
216
        #                         for line in subprocess.check_output(["git", "status", "--porcelain"]).splitlines()
217
        #                         if not line.strip().startswith(b"??")
218
        #                         ]
219
        #
220
        # str_lines = [line.decode("UTF-8") for line in lines]
221
        return not bool(str_lines), str_lines
1✔
222

223

224
def format_git_status(status: GitStatus) -> Iterator[str]:
1✔
225
        """
226
        Format the ``git`` status of the given repository for output to the terminal.
227

228
        :param status:
229

230
        :return: An iterator over the formatted list of uncommitted files.
231

232
        .. versionadded:: 0.6.1
233
        """
234

235
        files: Dict[bytes, str] = defaultdict(str)
1✔
236

237
        for key, code in status_codes.items():
1✔
238
                for file in status.staged[key]:  # type: ignore[misc]
1✔
239
                        files[file] += code
1✔
240

241
        for file in status.unstaged:
1✔
242
                files[file] += 'M'
1✔
243

244
        for file, codes in sorted(files.items(), key=itemgetter(0)):
1✔
245
                longest = max(len(v) for v in files.values()) + 1
1✔
246

247
                status_code = ''.join(sorted(codes)).ljust(longest, ' ')
1✔
248
                yield f"{status_code}{file!s}"
1✔
249

250

251
def get_untracked_paths(path: PathLike, index: Index) -> Iterator[str]:
1✔
252
        """
253
        Returns a list of untracked files.
254

255
        :param path: Path to walk.
256
        :param index: Index to check against.
257
        """
258

259
        path = str(path)
1✔
260

261
        for dirpath, dirnames, filenames in os.walk(path):
1✔
262
                # Skip .git etc. and below.
263
                for exclude in unwanted_dirs:
1✔
264
                        if exclude in dirnames:
1✔
265
                                dirnames.remove(exclude)
1✔
266

267
                                if dirpath != path:
1✔
268
                                        continue
×
269

270
                        if exclude in filenames:
1✔
271
                                filenames.remove(exclude)
×
272

273
                                if dirpath != path:
×
274
                                        continue
×
275

276
                for filename in filenames:
1✔
277
                        filepath = os.path.join(dirpath, filename)
1✔
278

279
                        _pp_filename = (PathPlus(path) / filepath)
1✔
280
                        if _pp_filename.is_symlink() and not _pp_filename.resolve().is_relative_to(path):
1✔
281
                                continue
×
282

283
                        ip = path_to_tree_path(path, filepath)
1✔
284

285
                        if ip not in index:
1✔
286
                                yield os.path.relpath(filepath, path)
1✔
287

288

289
def get_tree_changes(repo: Union[PathLike, dulwich.repo.Repo]) -> StagedDict:
1✔
290
        """
291
        Return add/delete/modify changes to tree by comparing the index to HEAD.
292

293
        :param repo: repo path or object.
294

295
        :returns: Dictionary containing changes for each type of change.
296

297
        .. versionadded:: 0.6.1
298
        """
299

300
        with open_repo_closing(repo) as r:
1✔
301
                index = r.open_index()
1✔
302

303
                # Compares the Index to the HEAD & determines changes
304
                # Iterate through the changes and report add/delete/modify
305
                # TODO: call out to dulwich.diff_tree somehow.
306
                tracked_changes: StagedDict = {
1✔
307
                                "add": [],
308
                                "delete": [],
309
                                "modify": [],
310
                                }
311
                try:
1✔
312
                        tree_id = r[b'HEAD'].tree  # type: ignore[attr-defined]
1✔
313
                except KeyError:
1✔
314
                        tree_id = None
1✔
315

316
                for change in index.changes_from_tree(r.object_store, tree_id):
1✔
317
                        if not change[0][0]:
1✔
318
                                tracked_changes["add"].append(PathPlus(change[0][1].decode("UTF-8")))
1✔
319
                        elif not change[0][1]:
1✔
320
                                tracked_changes["delete"].append(PathPlus(change[0][0].decode("UTF-8")))
1✔
321
                        elif change[0][0] == change[0][1]:
1✔
322
                                tracked_changes["modify"].append(PathPlus(change[0][0].decode("UTF-8")))
1✔
323
                        else:
324
                                raise NotImplementedError("git mv ops not yet supported")
325
                return tracked_changes
1✔
326

327

328
def status(repo: Union[dulwich.repo.Repo, PathLike] = '.') -> GitStatus:
1✔
329
        """
330
        Returns staged, unstaged, and untracked changes relative to the HEAD.
331

332
        :param repo: Path to repository or repository object.
333
        """
334

335
        with open_repo_closing(repo) as r:
1✔
336
                # 1. Get status of staged
337
                tracked_changes = get_tree_changes(r)
1✔
338

339
                # 2. Get status of unstaged
340
                index = r.open_index()
1✔
341
                normalizer = r.get_blob_normalizer()
1✔
342
                filter_callback = normalizer.checkin_normalize
1✔
343
                unstaged_changes = [
1✔
344
                                PathPlus(p.decode("UTF-8")) for p in get_unstaged_changes(index, str(r.path), filter_callback)
345
                                ]
346

347
                # Remove ignored files
348
                ignore_manager = IgnoreFilterManager.from_repo(r)
1✔
349
                untracked_changes = [
1✔
350
                                PathPlus(p) for p in get_untracked_paths(r.path, index) if not ignore_manager.is_ignored(p)
351
                                ]
352

353
                return GitStatus(tracked_changes, unstaged_changes, untracked_changes)
1✔
354

355

356
def clone(
1✔
357
                source: Union[str, bytes],
358
                target: Union[PathLike, bytes, None] = None,
359
                bare: bool = False,
360
                checkout: Optional[bool] = None,
361
                errstream: IO = default_bytes_err_stream,
362
                origin: Union[str, bytes] = "origin",
363
                depth: Optional[int] = None,
364
                **kwargs,
365
                ) -> Repo:
366
        """
367
        Clone a local or remote git repository.
368

369
        :param source: Path or URL for source repository.
370
        :param target: Path to target repository.
371
        :param bare: Whether to create a bare repository.
372
        :param checkout: Whether to check-out HEAD after cloning.
373
        :param errstream: Optional stream to write progress to.
374
        :param origin: Name of remote from the repository used to clone.
375
        :param depth: Depth to fetch at.
376

377
        :returns: The cloned repository.
378

379
        .. versionadded:: 0.6.1
380

381
        .. versionchanged:: 0.7.2
382

383
                * ``target`` now accepts :py:data:`domdf_python_tools.typing.PathLike` objects.
384
                * ``origin`` now accepts :class:`str` objects.
385
        """
386

387
        if checkout is None:
1✔
388
                checkout = (not bare)
1✔
389
        elif checkout and bare:
×
390
                raise TypeError("'checkout' and 'bare' are incompatible.")
×
391

392
        if isinstance(origin, bytes):
1✔
393
                origin = origin.decode("UTF-8")
×
394

395
        if isinstance(source, bytes):
1✔
396
                source = source.decode("UTF-8")
×
397

398
        if target is None:
1✔
399
                target = source.split('/')[-1]
×
400

401
        if isinstance(target, bytes):
1✔
402
                target = target.decode("UTF-8")
×
403

404
        maybe_make(target)
1✔
405

406
        if bare:
1✔
407
                r = Repo.init_bare(target)
×
408
        else:
409
                r = Repo.init(target)
1✔
410

411
        try:
1✔
412
                target_config = r.get_config()
1✔
413

414
                target_config.set(("remote", origin), "url", source.encode("UTF-8"))
1✔
415
                target_config.set(("remote", origin), "fetch", f"+refs/heads/*:refs/remotes/{origin}/*".encode("UTF-8"))
1✔
416
                target_config.write_to_path()
1✔
417
                fetch_result = fetch(
1✔
418
                                r,
419
                                origin,
420
                                errstream=errstream,
421
                                message=f"clone: from {source}".encode("UTF-8"),
422
                                depth=depth,
423
                                **kwargs,
424
                                )
425

426
                head: Optional[ShaFile]
427

428
                try:
1✔
429
                        head = r[fetch_result.refs[b'HEAD']]
1✔
430
                except KeyError:
×
431
                        head = None
×
432
                else:
433
                        r[b'HEAD'] = head.id
1✔
434

435
                if checkout and not bare and head is not None:
1✔
436
                        errstream.write(b'Checking out ' + head.id + b'\n')
1✔
437
                        r.reset_index(head.tree)  # type: ignore[attr-defined]  # TODO
1✔
438

439
        except BaseException:
×
440
                shutil.rmtree(target, ignore_errors=True)
×
441
                r.close()
×
442
                raise
×
443

444
        return r
1✔
445

446

447
@overload
1✔
448
def open_repo_closing(path_or_repo: _DR) -> ContextManager[_DR]: ...
1✔
449

450

451
@overload
1✔
452
def open_repo_closing(path_or_repo: Union[str, os.PathLike]) -> ContextManager[Repo]: ...
1✔
453

454

455
def open_repo_closing(path_or_repo) -> ContextManager:  # noqa: MAN001
1✔
456
        """
457
        Returns a context manager which will return :class:`dulwich.repo.Repo` objects unchanged,
458
        but will create a new :class:`dulwich.repo.Repo` when a filesystem path is given.
459

460
        .. versionadded:: 0.7.0
461

462
        :param path_or_repo: Either a :class:`dulwich.repo.Repo` object or the path of a repository.
463
        """  # noqa: D400
464

465
        if isinstance(path_or_repo, dulwich.repo.BaseRepo):
1✔
466
                return nullcontext(path_or_repo)
1✔
467

468
        return closing(Repo(path_or_repo))
1✔
469

470

471
@contextmanager
1✔
472
def windows_clone_helper() -> Iterator[None]:
1✔
473
        """
474
        Contextmanager to aid cloning on Windows during tests.
475

476
        .. versionadded:: 0.8.0
477

478
        .. attention:: This function is intended only for use in tests.
479

480
        Usage:
481

482
        .. code-block:: python
483

484
                with windows_clone_helper():
485
                        repo = clone(...)
486

487
        """
488

489
        _environ = dict(os.environ)  # or os.environ.copy()
1✔
490
        _default_backends = StackedConfig.default_backends
1✔
491

492
        try:
1✔
493
                name = "wordle_user"
1✔
494
                StackedConfig.default_backends = lambda *args: []  # type: ignore[assignment]
1✔
495
                os.environ["USER"] = os.environ.get("USER", name)
1✔
496

497
                yield
1✔
498

499
        finally:
500
                os.environ.clear()
1✔
501
                os.environ.update(_environ)
1✔
502
                StackedConfig.default_backends = _default_backends  # type: ignore[assignment]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc