• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

repo-helper / southwark / 15056466000

15 May 2025 10:31PM UTC coverage: 89.7% (+3.4%) from 86.322%
15056466000

push

github

web-flow
Updated files with 'repo_helper'. (#66)

Co-authored-by: repo-helper[bot] <74742576+repo-helper[bot]@users.noreply.github.com>

479 of 534 relevant lines covered (89.7%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.31
/southwark/targit.py
1
#!/usr/bin/env python3
2
#
3
#  targit.py
4
"""
5
Archive where the changes to the contents are recorded using `git <https://git-scm.com/>`_.
6
"""
7
#
8
#  Copyright © 2020,2022 Dominic Davis-Foster <dominic@davis-foster.co.uk>
9
#
10
#  Permission is hereby granted, free of charge, to any person obtaining a copy
11
#  of this software and associated documentation files (the "Software"), to deal
12
#  in the Software without restriction, including without limitation the rights
13
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14
#  copies of the Software, and to permit persons to whom the Software is
15
#  furnished to do so, subject to the following conditions:
16
#
17
#  The above copyright notice and this permission notice shall be included in all
18
#  copies or substantial portions of the Software.
19
#
20
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
24
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
25
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
26
#  OR OTHER DEALINGS IN THE SOFTWARE.
27
#
28

29
# stdlib
30
import atexit
1✔
31
import datetime
1✔
32
import getpass
1✔
33
import os
1✔
34
import re
1✔
35
import socket
1✔
36
import tarfile
1✔
37
import time
1✔
38
from typing import Iterator, NamedTuple, Optional
1✔
39

40
# 3rd party
41
from domdf_python_tools.doctools import prettify_docstrings
1✔
42
from domdf_python_tools.paths import PathPlus, TemporaryPathPlus
1✔
43
from domdf_python_tools.typing import PathLike
1✔
44
from dulwich.objects import format_timezone
1✔
45
from dulwich.repo import Repo
1✔
46
from filelock import FileLock, Timeout
1✔
47
from typing_extensions import Literal
1✔
48

49
# this package
50
from southwark import StagedDict, status
1✔
51

52
__all__ = [
1✔
53
                "BadArchiveError",
54
                "Modes",
55
                "Status",
56
                "TarGit",
57
                "check_archive_paths",
58
                "SaveState",
59
                ]
60

61
Modes = Literal["r", "w", "a"]
1✔
62
"""
63
Valid modes for opening :class:`~.TarGit` archives in
64

65
* ``'r'`` -- Read only access. The archive must exist.
66
* ``'w'`` -- Read and write access. The archive must not exist.
67
* ``'a'`` -- Read and write access to an existing archive.
68
"""
69

70
Status = StagedDict
1✔
71
"""
72
Represents the dictionary returned by :meth:`TarGit.status() <.TarGit.status>`.
73

74
The values are lists of filenames, relative to the TarGit root.
75
"""
76

77

78
@prettify_docstrings
1✔
79
class SaveState(NamedTuple):
1✔
80
        """
81
        Represents a save event in a :class:`~.TarGit` archive's history.
82
        """
83

84
        # TODO: changed files
85

86
        #: The SHA id of the underlying commit.
87
        id: str  # noqa: A003  # pylint: disable=redefined-builtin
1✔
88

89
        #: The name of the user who made the changes.
90
        user: str
1✔
91

92
        #: The hostname of the device the changes were made on.
93
        device: str
1✔
94

95
        #: The time the changes were saved, in seconds from epoch.
96
        time: float
1✔
97

98
        #: The timezone the changes were made in, as a GMT offset in seconds.
99
        timezone: int
1✔
100

101
        def format_time(self) -> str:
1✔
102
                """
103
                Format the save state's time in the following format::
104

105
                        Thu Oct 29 2020 15:53:52 +0000
106

107
                where ``+0000`` represents GMT.
108
                """  # noqa: D400
109

110
                time_tuple = time.gmtime(self.time + self.timezone)
1✔
111
                time_str = time.strftime("%a %b %d %Y %H:%M:%S", time_tuple)
1✔
112
                timezone_str = format_timezone(self.timezone).decode("UTF-8")
1✔
113
                return f"{time_str} {timezone_str}"
1✔
114

115

116
def check_archive_paths(archive: tarfile.TarFile) -> bool:
1✔
117
        """
118
        Checks the contents of an archive to ensure it does not contain
119
        any filenames with absolute paths or path traversal.
120

121
        For example, the following paths would raise an :exc:`~.BadArchiveError`:
122

123
        * ``/usr/bin/malware.sh`` -- this is an absolute path.
124
        * ``~/.local/bin/malware.sh`` -- this tries to put the file in the user's home directory.
125
        * ``../.local/bin/malware.sh`` -- this uses path traversal to try to get to a parent directory.
126

127
        .. seealso:: The warning for :meth:`tarfile.TarFile.extractall` in the Python documentation.
128

129
        :param archive:
130
        """  # noqa: D400
131

132
        for member_name in archive.getnames():
1✔
133
                member_name_p = PathPlus(member_name)
1✔
134
                if member_name_p.is_absolute() or ".." in member_name_p.parts or member_name.startswith('~'):
1✔
135
                        raise BadArchiveError
1✔
136

137
        return True
1✔
138

139

140
class BadArchiveError(IOError):
1✔
141
        """
142
        Exception to indicate an archive contains files utilising path traversal.
143
        """
144

145
        def __init__(self):
1✔
146
                super().__init__("Refusing to extract an archive containing files utilising path traversal.")
1✔
147

148

149
class TarGit(os.PathLike):
1✔
150
        """
151
        A "TarGit" (pronounced "target", /tɑːɡɪt/) is a ``tar.gz`` archive where the changes to the contents are
152
        recorded using `git <https://git-scm.com/>`_.
153

154
        :param filename: The filename of the archive.
155
        :param mode: The mode to open the file in.
156

157
        :raises FileNotFoundError: If the file is opened in read or append mode, but it does not exist.
158
        :raises FileExistsError: If the file is opened in write mode, but it already exists.
159
        :raises ValueError: If an unknown value for ``mode`` is given.
160

161
        .. versionchanged:: 0.10.0  Can now be used as a contextmanager.
162
        """  # noqa: D400
163

164
        __mode: Modes
1✔
165
        __repo: Repo
1✔
166
        __lock: Optional[FileLock]
1✔
167

168
        def __init__(self, filename: PathLike, mode: Modes = 'r'):
1✔
169
                self.filename = PathPlus(filename)
1✔
170
                self.__closed: bool = True
1✔
171

172
                self.__tmpdir: TemporaryPathPlus = TemporaryPathPlus()
1✔
173
                self.__tmpdir_p = self.__tmpdir.name
1✔
174
                atexit.register(self.__exit_handler)
1✔
175

176
                if mode in {'w', 'a'}:
1✔
177
                        lock_file = str(self.filename.with_suffix(self.filename.suffix + ".lock"))
1✔
178
                        self.__lock = FileLock(lock_file, timeout=1)
1✔
179
                        try:
1✔
180
                                self.__lock.acquire()
1✔
181
                        except Timeout:  # pragma: no cover
182
                                raise OSError(f"Unable to acquire a lock for the file '{self.filename!s}'")
183
                else:
184
                        self.__lock = None
1✔
185

186
                if mode in {'r', 'a'}:
1✔
187
                        if not self.exists():
1✔
188
                                raise FileNotFoundError(f"No such TarGit file '{self.filename!s}'")
1✔
189

190
                        with tarfile.open(
1✔
191
                                        self.filename,
192
                                        mode="r:gz",
193
                                        format=tarfile.PAX_FORMAT,
194
                                        ) as tf:
195
                                check_archive_paths(tf)
1✔
196
                                tf.extractall(path=self.__tmpdir_p)
1✔
197

198
                        self.__repo = Repo(self.__tmpdir_p)
1✔
199
                        self.__mode = mode
1✔
200
                        self.__closed = False
1✔
201

202
                elif mode in {'w'}:
1✔
203
                        if self.exists():
1✔
204
                                raise FileExistsError(f"TarGit file '{self.filename!s}' already exists.")
1✔
205

206
                        # Initialise git repo in tmpdir
207
                        self.__repo = Repo.init(self.__tmpdir_p)
1✔
208
                        self.__mode = mode
1✔
209
                        self.__closed = False
1✔
210
                        self.__do_commit(message="Empty initial commit.")
1✔
211

212
                else:
213
                        raise ValueError(f"Unknown IO mode {mode!r}")
1✔
214

215
        def save(self) -> bool:
1✔
216
                """
217
                Saves the contents of the archive.
218

219
                Does nothing if there are no changes to be saved.
220

221
                :returns: Whether there were any changes to save.
222

223
                :raises IOError: If the file is closed, or if it was opened in read-only mode.
224
                """
225

226
                if self.closed:
1✔
227
                        raise OSError("IO operation on closed TarGit file.")
1✔
228
                elif self.__mode not in {'w', 'a'}:
1✔
229
                        raise OSError("Cannot write to TarGit file opened in read-only mode.")
1✔
230

231
                current_status = self.status()
1✔
232

233
                if any([
1✔
234
                                current_status["add"] != [],
235
                                current_status["delete"] != [],
236
                                current_status["modify"] != [],
237
                                ]):
238
                        # There are changes to commit
239
                        message = "; ".join([
1✔
240
                                        f"{len(current_status['add'])} added",
241
                                        f"{len(current_status['delete'])} deleted",
242
                                        f"{len(current_status['modify'])} modified",
243
                                        ])
244

245
                        self.__do_commit(message)
1✔
246

247
                        with self.filename.open("wb", buffering=False) as fp:
1✔
248
                                with tarfile.open(
1✔
249
                                                self.filename,
250
                                                mode="w:gz",
251
                                                format=tarfile.PAX_FORMAT,
252
                                                fileobj=fp,
253
                                                ) as tf:
254
                                        tf.add(str(self.__tmpdir_p), arcname='')
1✔
255

256
                                fp.flush()
1✔
257

258
                        return True
1✔
259

260
                return False
1✔
261

262
        def status(self) -> StagedDict:
1✔
263
                """
264
                Returns the status of the TarGit archive.
265

266
                The values in the dictionary are lists of filenames, relative to the TarGit root.
267

268
                :raises IOError: If the file is closed.
269
                """
270

271
                if self.closed:
1✔
272
                        raise OSError("IO operation on closed TarGit file.")
1✔
273
                elif self.__mode not in {'w', 'a'}:
1✔
274
                        return {"add": [], "delete": [], "modify": []}
1✔
275

276
                current_status = status(self.__tmpdir_p)
1✔
277

278
                for file in (*current_status.unstaged, *current_status.untracked):
1✔
279
                        self.__repo.stage(str(file))
1✔
280

281
                return status(self.__tmpdir_p).staged
1✔
282

283
        def __do_commit(self, message: str) -> None:
1✔
284
                if self.closed:  # pragma: no cover (guarded in all callers)
285
                        raise OSError("IO operation on closed TarGit file.")
286
                elif self.__mode not in {'w', 'a'}:  # pragma: no cover (guarded in all callers)
287
                        raise OSError("Cannot write to TarGit file opened in read-only mode.")
288

289
                login = getpass.getuser()
1✔
290
                username = f"{login} <{login}@{socket.gethostname()}>"
1✔
291
                current_time = datetime.datetime.now(datetime.timezone.utc).astimezone()
1✔
292
                current_timezone = current_time.tzinfo.utcoffset(None).total_seconds()  # type: ignore[union-attr]
1✔
293

294
                self.__repo.do_commit(
1✔
295
                                message=message.encode("UTF-8"),
296
                                committer=username.encode("UTF-8"),
297
                                author=username.encode("UTF-8"),
298
                                commit_timestamp=current_time.timestamp(),
299
                                commit_timezone=current_timezone,
300
                                )
301

302
        def exists(self) -> bool:
1✔
303
                """
304
                Returns whether the :class:`~.TarGit` archive exists.
305
                """
306

307
                return self.filename.is_file()
1✔
308

309
        def close(self) -> None:
1✔
310
                """
311
                Closes the :class:`~.TarGit` archive.
312
                """
313

314
                self.__exit_handler()
1✔
315
                atexit.unregister(self.__exit_handler)
1✔
316

317
        def __exit_handler(self) -> None:
1✔
318
                if self.__tmpdir is not None:
1✔
319
                        self.__tmpdir.cleanup()
1✔
320
                if self.__lock is not None:
1✔
321
                        self.__lock.release()
1✔
322
                self.__closed = True
1✔
323

324
        @property
1✔
325
        def closed(self) -> bool:
1✔
326
                """
327
                Returns whether the :class:`~.TarGit` archive is closed.
328
                """
329

330
                return self.__closed
1✔
331

332
        @property
1✔
333
        def mode(self) -> Modes:
1✔
334
                """
335
                Returns the mode the :class:`~.TarGit` archive was opened in.
336

337
                This defaults to ``'r'``. After the archive is closed this will show the
338
                last mode until the archive is opened again.
339
                """
340

341
                return self.__mode
1✔
342

343
        def __truediv__(self, filename: PathLike) -> PathPlus:
1✔
344
                """
345
                Returns a :class:`~domdf_python_tools.paths.PathPlus` object
346
                representing the given filename relative to the archive root.
347

348
                :param filename:
349
                """  # noqa: D400
350

351
                return self.__tmpdir_p / filename
1✔
352

353
        def __del__(self) -> None:
1✔
354
                self.close()
1✔
355

356
        def __repr__(self) -> str:
1✔
357
                """
358
                Returns a string representation of the :class:`~.TarGit`.
359
                """
360

361
                return f"{self.__class__.__name__}({self.filename})"
1✔
362

363
        def __fspath__(self) -> str:
1✔
364
                """
365
                Returns the filename of the :class:`~.TarGit` archive.
366
                """
367

368
                return os.fspath(self.filename)
1✔
369

370
        def __str__(self) -> str:
1✔
371
                """
372
                Returns the filename of the :class:`~.TarGit` archive.
373
                """
374

375
                return self.filename.as_posix()
1✔
376

377
        @property
1✔
378
        def history(self) -> Iterator[SaveState]:
1✔
379
                """
380
                Returns an iterable over the historic save states of the :class:`~.TarGit`.
381
                :return:
382
                """
383
                if self.closed:
1✔
384
                        raise OSError("IO operation on closed TarGit file.")
1✔
385

386
                for entry in self.__repo.get_walker():
1✔
387
                        # TODO: changed files
388

389
                        author_m = re.match(r".*?\s+<(.*?)@(.*?)>", entry.commit.author.decode("UTF-8"))
1✔
390
                        if author_m:
1✔
391
                                user, device = author_m.groups()
1✔
392
                        else:
393
                                user, device = '', ''
×
394

395
                        yield SaveState(
1✔
396
                                        id=entry.commit.id.decode("UTF-8"),
397
                                        user=user,
398
                                        device=device,
399
                                        time=entry.commit.author_time,
400
                                        timezone=entry.commit.author_timezone,
401
                                        )
402

403
        def __enter__(self) -> "TarGit":
1✔
404
                """
405
                Setup and acquire the resource and return it.
406
                """
407

408
                return self
1✔
409

410
        def __exit__(self, exc_type, exc_value, traceback) -> bool:  # type: ignore[return]
1✔
411
                """
412
                Shutdown and release the resource even if an error was raised.
413
                """
414

415
                self.close()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc