• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 7796321857

06 Feb 2024 07:49AM UTC coverage: 85.162% (-0.009%) from 85.171%
7796321857

Pull #3700

github

web-flow
Merge 5dcd46c70 into e98f11228
Pull Request #3700: chore: combined dependency update

26659 of 31304 relevant lines covered (85.16%)

3.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.91
/renku/core/util/git.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Git utility functions."""
8✔
17

18
import contextlib
8✔
19
import os
8✔
20
import posixpath
8✔
21
import re
8✔
22
import shutil
8✔
23
import sys
8✔
24
import time
8✔
25
import urllib
8✔
26
from functools import reduce
8✔
27
from pathlib import Path
8✔
28
from subprocess import PIPE, SubprocessError, run
8✔
29
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast
8✔
30
from uuid import uuid4
8✔
31

32
import git
8✔
33

34
from renku.core import errors
8✔
35
from renku.infrastructure.repository import DiffChangeType
8✔
36

37
if TYPE_CHECKING:
8✔
38
    from renku.domain_model.entity import Collection, Entity
×
39
    from renku.domain_model.git import GitURL
×
40
    from renku.domain_model.provenance.agent import Person, SoftwareAgent
×
41
    from renku.infrastructure.repository import Commit, Remote, Repository
×
42

43
COMMIT_DIFF_STRATEGY = "DIFF"
8✔
44
STARTED_AT = int(time.time() * 1e3)
8✔
45

46
BRANCH_NAME_LIMIT = 250
8✔
47
CLI_GITLAB_ENDPOINT = "repos"
8✔
48
PROTECTED_BRANCH_PREFIX = "renku/autobranch"
8✔
49
RENKU_BACKUP_PREFIX = "renku-backup"
8✔
50

51

52
# TODO: Make sure caching is thread-safe
53
_entity_cache: Dict[Tuple[Optional[str], str], Union["Entity", "Collection"]] = {}
8✔
54

55

56
def run_command(command, *paths, separator=None, **kwargs):
8✔
57
    """Execute command by splitting `paths` to make sure that argument list will be within os limits.
58

59
    Args:
60
        command: A list or tuple containing command and its arguments.
61
        *paths: Paths to run on.
62
        separator: Separator for `paths` if they need to be passed as string. (Default value = None)
63
    Raises:
64
        errors.GitError: If a Git subcommand failed.
65
    Returns:
66
        Result of last invocation.
67

68
    """
69
    from renku.infrastructure.repository import split_paths
6✔
70

71
    result = None
6✔
72

73
    for batch in split_paths(*paths):
6✔
74
        if separator:
6✔
75
            batch = [separator.join(batch)]
6✔
76

77
        try:
6✔
78
            if not isinstance(batch, list):
6✔
79
                batch = list(batch)
5✔
80
            result = run(command + batch, **kwargs)
6✔
81

82
            if result.returncode != 0:
6✔
83
                break
×
84
        except KeyboardInterrupt:
×
85
            raise
×
86
        except SubprocessError as e:
×
87
            raise errors.GitError(f"Cannot run command {command} : {e}")
×
88

89
    return result
6✔
90

91

92
def is_valid_git_repository(repository: Optional["Repository"]) -> bool:
8✔
93
    """Return if is a git repository and has a valid HEAD.
94

95
    Args:
96
        repository(Optional[Repository]): The repository to check.
97

98
    Returns:
99
        bool: Whether this is a valid Git repository.
100

101
    """
102
    return repository is not None and repository.head.is_valid()
×
103

104

105
def get_hook_path(path: Path, name: str) -> Path:
8✔
106
    """Return path to the given named hook in the given repository.
107

108
    Args:
109
        path(Path): The current Git repository's path.
110
        name(str): The name of the hook.
111

112
    Returns:
113
        Path: Path to the hook.
114

115
    """
116
    return path / ".git" / "hooks" / name
8✔
117

118

119
def get_oauth_url(url, gitlab_token):
8✔
120
    """Format URL with a username and password.
121

122
    Args:
123
        url: The URL to format.
124
        gitlab_token: The Gitlab OAuth2 Token.
125

126
    Returns:
127
        The URL with credentials added.
128
    """
129
    parsed_url = urllib.parse.urlparse(url)
1✔
130

131
    if not parsed_url.netloc:
1✔
132
        raise ValueError(f"Invalid http git url: {url}")
×
133

134
    netloc = f"oauth2:{gitlab_token}@{parsed_url.netloc}"
1✔
135
    return parsed_url._replace(netloc=netloc).geturl()
1✔
136

137

138
def get_cache_directory_for_repository(url) -> Path:
8✔
139
    """Return a path to project's cache directory.
140

141
    Args:
142
        url: The repository URL.
143

144
    Returns:
145
        Path: The path of the cache.
146

147
    """
148
    from renku.core.constant import CACHE
2✔
149
    from renku.domain_model.project_context import project_context
2✔
150

151
    return project_context.metadata_path / CACHE / get_full_repository_path(url)
2✔
152

153

154
def parse_git_url(url: Optional[str]) -> "GitURL":
8✔
155
    """Return parsed git url.
156

157
    Args:
158
        url(Optional[str]): The URL to parse.
159
    Raises:
160
        errors.InvalidGitURL: If ``url`` is empty.
161
    Returns:
162
        GitURL: The parsed GitURL.
163

164
    """
165
    from renku.domain_model.git import GitURL
5✔
166

167
    if not url:
5✔
168
        raise errors.InvalidGitURL("No URL provided.")
×
169

170
    return GitURL.parse(url)
5✔
171

172

173
def have_same_remote(url1, url2) -> bool:
8✔
174
    """Checks if two git urls point to the same remote repo ignoring protocol and credentials.
175

176
    Args:
177
        url1: The first URL.
178
        url2:The second URL.
179

180
    Returns:
181
        bool: True if both URLs point to the same repository.
182
    """
183
    try:
1✔
184
        u1 = parse_git_url(url1)
1✔
185
        u2 = parse_git_url(url2)
1✔
186

187
        return u1.hostname == u2.hostname and u1.path == u2.path
1✔
188
    except (errors.GitError, AttributeError):
×
189
        return False
×
190

191

192
def get_renku_repo_url(remote_url, deployment_hostname=None, access_token=None):
8✔
193
    """Return a repo url that can be authenticated by renku.
194

195
    Args:
196
        remote_url: The repository URL.
197
        deployment_hostname: The host name used by this deployment (Default value = None).
198
        access_token: The OAuth2 access token (Default value = None).
199
    Returns:
200
        The Renku repository URL with credentials.
201
    """
202
    parsed_remote = parse_git_url(remote_url)
3✔
203
    path = parsed_remote.path.strip("/")
3✔
204
    if path.startswith("gitlab/"):
3✔
205
        path = path.replace("gitlab/", "")
2✔
206
    if not path.startswith(f"{CLI_GITLAB_ENDPOINT}/"):
3✔
207
        path = posixpath.join(CLI_GITLAB_ENDPOINT, path)
3✔
208

209
    credentials = f"renku:{access_token}@" if access_token else ""
3✔
210
    hostname = deployment_hostname or parsed_remote.hostname
3✔
211

212
    if hostname.startswith("gitlab."):
3✔
213
        hostname = hostname.replace("gitlab.", "", 1)
3✔
214

215
    return urllib.parse.urljoin(f"https://{credentials}{hostname}", path)
3✔
216

217

218
def create_backup_remote(repository: "Repository", remote_name: str, url: str) -> Tuple[str, bool, Optional["Remote"]]:
8✔
219
    """Create a backup for ``remote_name`` and sets its url to ``url``.
220

221
    Args:
222
        repository(Repository): The current repository.
223
        remote_name(str): The name of the backup remote.
224
        url(str): The remote URL.
225

226
    Returns:
227
        Tuple[str, bool, Optional[Remote]]: Tuple of backup remote name, whether it existed already and the created
228
            remote if successful.
229
    """
230
    backup_remote_name = f"{RENKU_BACKUP_PREFIX}-{remote_name}"
2✔
231

232
    backup_exists = any(backup_remote_name == r.name for r in repository.remotes)
2✔
233
    if backup_exists:
2✔
234
        return backup_remote_name, True, None
2✔
235

236
    try:
2✔
237
        remote = repository.remotes.add(name=backup_remote_name, url=url)
2✔
238
    except errors.GitCommandError:
×
239
        return backup_remote_name, False, None
×
240
    else:
241
        return backup_remote_name, False, remote
2✔
242

243

244
def set_git_credential_helper(repository: "Repository", hostname):
8✔
245
    """Set up credential helper for renku git."""
246
    with repository.get_configuration(writable=True) as config:
2✔
247
        config.set_value("credential", "helper", f"!renku credentials --hostname {hostname}")
2✔
248

249

250
def get_full_repository_path(url: Optional[str]) -> str:
8✔
251
    """Extract hostname/path of a git repository from its URL.
252

253
    Args:
254
        url(Optional[str]): The URL.
255

256
    Returns:
257
        The hostname plus path extracted from the URL.
258
    """
259
    if url is None:
2✔
260
        return ""
×
261

262
    parsed_url = parse_git_url(url)
2✔
263
    return posixpath.join(parsed_url.hostname, parsed_url.path)  # type:ignore
2✔
264

265

266
def get_repository_name(url: str) -> str:
8✔
267
    """Extract name of a git repository from its URL.
268

269
    Args:
270
        url(str): The URL to get the repository name from.
271

272
    Returns:
273
        str: The repository name.
274
    """
275
    return Path(get_renku_repo_url(url)).stem
3✔
276

277

278
def get_committer_agent(commit: "Commit") -> "SoftwareAgent":
8✔
279
    """Return committer SoftwareAgent.
280

281
    Args:
282
        commit(Commit): The commit to check.
283

284
    Returns:
285
        SoftwareAgent: The agent responsible for the commit.
286
    """
287
    from renku.domain_model.provenance.agent import SoftwareAgent
×
288

289
    return SoftwareAgent(id=commit.committer.email, name=commit.committer.name)
×
290

291

292
def get_git_user(repository: Optional["Repository"]) -> Optional["Person"]:
8✔
293
    """Return git user.
294

295
    Args:
296
        repository(Optional[Repository]): The Git repository.
297

298
    Returns:
299
        Optional[Person]: The person associated with the repository.
300

301
    """
302
    from renku.domain_model.provenance.agent import Person
8✔
303

304
    if repository is None:
8✔
305
        return None
×
306

307
    user = repository.get_user()
8✔
308
    return Person(name=user.name, email=user.email)
8✔
309

310

311
def get_remote(
8✔
312
    repository: Optional["Repository"], *, name: Optional[str] = None, url: Optional[str] = None
313
) -> Optional["Remote"]:
314
    """Return repository's remote using its name or url or return default remote if any.
315

316
    Args:
317
        repository(Optional[Repository]): The Git repository.
318
        name(str, optional): The name of the remote (Default value = None).
319
        url(str, optional): The remote URL (Default value = None).
320

321
    Returns:
322
        Optional[Remote]: The remote, if found.
323

324
    """
325
    if not repository or len(repository.remotes) == 0:
8✔
326
        return None
8✔
327
    elif name:
6✔
328
        return next((r for r in repository.remotes if r.name == name), None)
4✔
329
    elif url:
6✔
330
        return next((r for r in repository.remotes if r.url == url), None)
3✔
331
    elif len(repository.remotes) == 1:
5✔
332
        return repository.remotes[0]
4✔
333
    elif repository.active_branch and repository.active_branch.remote_branch:
3✔
334
        return repository.active_branch.remote_branch.remote
3✔
335

336
    return None
2✔
337

338

339
def check_global_git_user_is_configured():
8✔
340
    """Check that git user information is configured."""
341
    from renku.infrastructure.repository import Repository
7✔
342

343
    try:
7✔
344
        Repository.get_global_user()
7✔
345
    except errors.GitConfigurationError:
×
346
        return False
×
347
    else:
348
        return True
7✔
349

350

351
def is_path_safe(path: Union[Path, str]) -> bool:
8✔
352
    """Check if the path should be used in output.
353

354
    Args:
355
        path(Union[Path, str]): The path to check.
356

357
    Returns:
358
        bool: True if the path is safe else False.
359
    """
360
    path = str(path)
4✔
361

362
    # Should not be in ignore paths.
363
    if path in {".gitignore", ".gitattributes"}:
4✔
364
        return False
×
365

366
    # Ignore everything in .renku ...
367
    if path.startswith(".renku"):
4✔
368
        return False
×
369

370
    return True
4✔
371

372

373
def get_entity_from_revision(
8✔
374
    repository: "Repository",
375
    path: Union[Path, str],
376
    revision: Optional[str] = None,
377
    bypass_cache: bool = False,
378
    checksum: Optional[str] = None,
379
) -> "Entity":
380
    """Return an Entity instance from given path and revision.
381

382
    Args:
383
        repository(Repository): The current repository.
384
        path(Union[Path, str]): The path of the entity.
385
        revision(str, optional): The revision to check at (Default value = None).
386
        bypass_cache(bool): Whether to ignore cached entries and get information from disk (Default value = False).
387
        checksum(str, optional): Pre-calculated checksum for performance reasons, will be calculated if not set.
388
    Returns:
389
        Entity: The Entity for the given path and revision.
390

391
    """
392
    from renku.domain_model.constant import NON_EXISTING_ENTITY_CHECKSUM
6✔
393
    from renku.domain_model.entity import Collection, Entity
6✔
394

395
    def get_directory_members(absolute_path: Path) -> List[Entity]:
6✔
396
        """Return first-level files/directories in a directory."""
397
        members: List[Entity] = []
2✔
398

399
        for member in absolute_path.iterdir():
2✔
400
            if member.name == ".gitkeep":
2✔
401
                continue
2✔
402

403
            member_path = member.relative_to(repository.path)
2✔
404

405
            assert all(member_path != m.path for m in members)
2✔
406

407
            entity = get_entity_from_revision(repository, member_path, revision, bypass_cache=bypass_cache)
2✔
408
            # NOTE: If a path is not found at a revision we assume that it didn't exist at that revision
409
            if entity:
2✔
410
                members.append(entity)
2✔
411

412
        return members
2✔
413

414
    global _entity_cache
415
    key = (revision, str(path))
6✔
416
    cached_entry = _entity_cache.get(key)
6✔
417
    if cached_entry and not bypass_cache:
6✔
418
        return cached_entry
4✔
419

420
    # NOTE: For untracked directory the hash is None; make sure to stage them first before calling this function.
421
    if not checksum:
6✔
422
        checksum = repository.get_object_hash(revision=revision, path=path)
6✔
423
    # NOTE: If object was not found at a revision it's either removed or exists in a different revision; keep the
424
    # entity and use revision as checksum
425
    if isinstance(revision, str) and revision == "HEAD":
6✔
426
        revision = repository.head.commit.hexsha
×
427
    checksum = checksum or revision or NON_EXISTING_ENTITY_CHECKSUM
6✔
428
    id = Entity.generate_id(checksum=checksum, path=path)
6✔
429

430
    absolute_path = repository.path / path
6✔
431
    if str(path) != "." and absolute_path.is_dir():
6✔
432
        members = get_directory_members(absolute_path)
2✔
433
        entity: Union[Entity, Collection] = Collection(id=id, checksum=checksum, path=path, members=members)
2✔
434
    else:
435
        entity = Entity(id=id, checksum=checksum, path=path)
6✔
436

437
    _entity_cache[key] = entity
6✔
438

439
    return entity
6✔
440

441

442
def get_git_path(path: Union[Path, str] = ".") -> Path:
8✔
443
    """Return the repository path."""
444
    # TODO: Implement this using ``git rev-parse --git-dir``
445
    try:
8✔
446
        path = get_git_repository(path=path).path
8✔
447
    except ValueError:
6✔
448
        path = Path(path)
6✔
449

450
    return path.resolve()
8✔
451

452

453
def get_git_repository(path: Union[Path, str] = ".") -> "Repository":
8✔
454
    """Get Git repository from the current path or any of its parents.
455

456
    Args:
457
        path: Path to start from (Default value = ".").
458
    Raises:
459
        ValueError: If not inside a git repository.
460
    Returns:
461
        Git repository
462
    """
463
    from renku.infrastructure.repository import Repository
8✔
464

465
    try:
8✔
466
        return Repository(path, search_parent_directories=True)
8✔
467
    except errors.GitError:
7✔
468
        raise ValueError(f"Cannot find a git repository at '{path}'")
7✔
469

470

471
def commit_changes(*paths: Union[Path, str], repository: "Repository", message=None) -> List[str]:
8✔
472
    """Commit paths to the repository.
473

474
    Args:
475
        *paths(Union[Path, str]): The paths to commit.
476
        repository(Repository): The repository to commit to.
477
        message: The commit message (Default value = None).
478
    Raises:
479
        errors.GitError: If paths couldn't be committed.
480
    Returns:
481
        List of paths that were committed.
482
    """
483
    if len(paths) == 0:
2✔
484
        return []
×
485

486
    try:
2✔
487
        staged_files = {c.a_path for c in repository.staged_changes} if repository.head.is_valid() else set()
2✔
488
        path_to_save = set(paths) - staged_files
2✔
489
        repository.add(*path_to_save)
2✔
490
        saved_paths = [c.b_path for c in repository.staged_changes]
2✔
491

492
        if saved_paths:
2✔
493
            if not message:
2✔
494
                # Show saved files in message
495
                max_line_len = 100
2✔
496
                max_total_len = 100000
2✔
497
                message = "Saved changes to: "
2✔
498
                paths_with_lens = cast(
2✔
499
                    List[Tuple[str, int]],
500
                    reduce(
501
                        lambda c, x: c + [(x, c[-1][1] + len(x))],
502
                        saved_paths,
503
                        cast(List[Tuple[Optional[str], int]], [(None, len(message))]),
504
                    )[1:],
505
                )
506
                # limit first line to max_len characters
507
                message += " ".join(p if l < max_line_len else "\n\t" + p for p, l in paths_with_lens)
2✔
508

509
                if len(message) > max_total_len:
2✔
510
                    message = message[: max_total_len - 3] + "..."
×
511

512
            repository.commit(message)
2✔
513
    except errors.GitCommandError as e:
×
514
        raise errors.GitError("Cannot commit changes") from e
×
515
    else:
516
        return saved_paths
2✔
517

518

519
def push_changes(repository: "Repository", remote: Optional[str] = None, reset: bool = True) -> str:
8✔
520
    """Push to a remote branch. If the remote branch is protected a new remote branch will be created and pushed to.
521

522
    Args:
523
        repository(Repository): The current repository.
524
        remote(str, optional): The remote to push to (Default value = None).
525
        reset(bool, optional): Whether to reset active branch to its upstream branch, used if changes get
526
            pushed to a temporary branch (Default value = True).
527
    Raises:
528
        errors.GitError: If there's no remote or the push fails.
529
    Returns:
530
        str: Name of the branch that was pushed to.
531
    """
532
    from renku.core.util import communication
5✔
533

534
    if repository.active_branch is None:
5✔
535
        raise errors.GitError("Cannot push changes when repository is in detached HEAD state.")
×
536

537
    if repository.active_branch.remote_branch:
5✔
538
        ref = repository.active_branch.remote_branch.name
5✔
539
        pushed_branch = ref.split("/")[-1]
5✔
540
    else:
541
        pushed_branch = repository.active_branch.name
3✔
542

543
    if remote is not None:
5✔
544
        pushed_remote = get_remote(repository, url=remote) or get_remote(repository, name=remote)
4✔
545
        if not pushed_remote:
4✔
546
            if get_remote(repository, name="origin") is not None:
2✔
547
                pushed_branch = uuid4().hex
×
548
                pushed_remote_name = uuid4().hex
×
549
            else:
550
                pushed_remote_name = "origin"
2✔
551
            pushed_remote = repository.remotes.add(name=pushed_remote_name, url=remote)
2✔
552
    else:
553
        pushed_remote = get_remote(repository)
3✔
554
        if not pushed_remote:
3✔
555
            raise errors.GitRemoteNotFoundError("No remote has been set up for the current branch")
×
556

557
    try:
5✔
558
        # NOTE: Push local changes to remote branch.
559
        merge_conflict = False
5✔
560
        if len(pushed_remote.references) > 0 and repository.active_branch.remote_branch in pushed_remote.references:
5✔
561
            repository.fetch(pushed_remote)
5✔
562
            try:
5✔
563
                repository.pull(pushed_remote, repository.active_branch)
5✔
564
            except errors.GitCommandError as e:
3✔
565
                # NOTE: Couldn't pull, probably due to conflicts, try a merge.
566
                # NOTE: the error sadly doesn't tell any details.
567
                unmerged_blobs = repository.unmerged_blobs.values()
3✔
568
                conflicts = (stage != 0 for blobs in unmerged_blobs for stage, _ in blobs)
3✔
569
                if any(conflicts):
3✔
570
                    merge_conflict = True
3✔
571

572
                    if communication.confirm(
3✔
573
                        "There were conflicts when updating the local data with remote changes,"
574
                        " do you want to resolve them (if not, a new remote branch will be created)?",
575
                        warning=True,
576
                    ):
577
                        repository.run_git_command("mergetool", "-g")
×
578
                        repository.commit("merging conflict", no_edit=True)
×
579
                        merge_conflict = False
×
580
                    else:
581
                        repository.reset(hard=True)
3✔
582
                else:
583
                    raise errors.GitError("Cannot pull changes from remote") from e
×
584

585
        push_failed = False
5✔
586

587
        if not merge_conflict:
5✔
588
            # NOTE: Try pushing to remote branch which might fail if the branch is protected
589
            try:
5✔
590
                repository.push(pushed_remote, f"{repository.active_branch.name}:{pushed_branch}")
5✔
591
            except errors.GitCommandError:
3✔
592
                push_failed = True
3✔
593

594
        if merge_conflict or push_failed:
5✔
595
            # NOTE: Push to a new remote branch and reset the cache.
596
            last_short_sha = repository.head.commit.hexsha[0:8]
5✔
597
            old_active_branch = str(repository.active_branch)
5✔
598
            fixed_chars_len = len(PROTECTED_BRANCH_PREFIX) + len(last_short_sha) + 2
5✔
599
            if len(old_active_branch) + fixed_chars_len > BRANCH_NAME_LIMIT:
5✔
600
                old_branch_reference = old_active_branch[0 : (BRANCH_NAME_LIMIT - fixed_chars_len)]
×
601
            else:
602
                old_branch_reference = old_active_branch
5✔
603
            pushed_branch = f"{PROTECTED_BRANCH_PREFIX}/{old_branch_reference}/{last_short_sha}"
5✔
604
            try:
5✔
605
                repository.branches.add(pushed_branch)
5✔
606
                repository.checkout(pushed_branch)
5✔
607
                repository.push(pushed_remote, pushed_branch, set_upstream=True)
5✔
608
            except Exception:
×
609
                reset = False  # NOTE: Don't reset the repository if push to the new remote branch failed
×
610
                raise
×
611
            finally:
612
                repository.checkout(old_active_branch)
5✔
613
                if reset:
5✔
614
                    repository.reset(reference=repository.active_branch.remote_branch, hard=True)
5✔
615
    except errors.GitCommandError as e:
×
616
        raise errors.GitError("Cannot push changes") from e
×
617

618
    return pushed_branch
5✔
619

620

621
def clone_renku_repository(
8✔
622
    url: str,
623
    path: Optional[Union[Path, str]],
624
    gitlab_token=None,
625
    deployment_hostname=None,
626
    depth: Optional[int] = None,
627
    install_githooks=False,
628
    install_lfs=True,
629
    skip_smudge=True,
630
    recursive=False,
631
    progress=None,
632
    config: Optional[dict] = None,
633
    raise_git_except=False,
634
    checkout_revision=None,
635
    use_renku_credentials: bool = False,
636
    reuse_existing_repository: bool = False,
637
) -> "Repository":
638
    """Clone a Renku Repository.
639

640
    Args:
641
        url(str): The Git URL to clone.
642
        path(Union[Path, str]): The path to clone into.
643
        gitlab_token: The gitlab OAuth2 token (Default value = None).
644
        deployment_hostname: The hostname of the current renku deployment (Default value = None).
645
        depth(Optional[int], optional): The clone depth, number of commits from HEAD (Default value = None).
646
        install_githooks: Whether to install git hooks (Default value = False).
647
        install_lfs: Whether to install Git LFS (Default value = True).
648
        skip_smudge: Whether to pull files from Git LFS (Default value = True).
649
        recursive: Whether to clone recursively (Default value = False).
650
        progress: The GitProgress object (Default value = None).
651
        config(Optional[dict], optional): Set configuration for the project (Default value = None).
652
        raise_git_except: Whether to raise git exceptions (Default value = False).
653
        checkout_revision: The revision to check out after clone (Default value = None).
654
        use_renku_credentials(bool, optional): Whether to use Renku provided credentials (Default value = False).
655
        reuse_existing_repository(bool, optional): Whether to clone over an existing repository (Default value = False).
656

657
    Returns:
658
        The cloned repository.
659
    """
660
    from renku.core.login import has_credentials_for_hostname
5✔
661

662
    parsed_url = parse_git_url(url)
5✔
663

664
    clone_options = None
5✔
665
    create_backup = False
5✔
666

667
    if parsed_url.hostname == "localhost":
5✔
668
        absolute_path = Path(parsed_url.path).resolve()
2✔
669
        git_url = str(absolute_path)
2✔
670
    elif parsed_url.scheme in ["http", "https"] and gitlab_token:
4✔
671
        git_url = get_oauth_url(url, gitlab_token)
1✔
672
    elif (
4✔
673
        parsed_url.scheme in ["http", "https"]
674
        and use_renku_credentials
675
        and has_credentials_for_hostname(parsed_url.hostname)  # NOTE: Don't change remote URL if no credentials exist
676
    ):
677
        clone_options = [f"--config credential.helper='!renku credentials --hostname {parsed_url.hostname}'"]
2✔
678
        deployment_hostname = deployment_hostname or parsed_url.hostname
2✔
679
        git_url = get_renku_repo_url(url, deployment_hostname=deployment_hostname, access_token=None)
2✔
680
        create_backup = True
2✔
681
    else:
682
        git_url = url
4✔
683

684
    repository = clone_repository(
5✔
685
        git_url,
686
        path=path,
687
        install_githooks=install_githooks,
688
        depth=depth,
689
        clean=reuse_existing_repository,
690
        install_lfs=install_lfs,
691
        skip_smudge=skip_smudge,
692
        recursive=recursive,
693
        progress=progress,
694
        config=config,
695
        raise_git_except=raise_git_except,
696
        checkout_revision=checkout_revision,
697
        clone_options=clone_options,
698
    )
699

700
    if create_backup:
5✔
701
        create_backup_remote(repository=repository, remote_name="origin", url=url)
2✔
702
        set_git_credential_helper(
2✔
703
            repository=cast("Repository", repository), hostname=deployment_hostname or parsed_url.hostname
704
        )
705

706
    return repository
5✔
707

708

709
def clone_repository(
8✔
710
    url,
711
    path: Optional[Union[Path, str]] = None,
712
    install_githooks=True,
713
    install_lfs=True,
714
    skip_smudge=True,
715
    recursive=False,
716
    depth=None,
717
    progress: Optional[git.RemoteProgress] = None,
718
    config: Optional[dict] = None,
719
    raise_git_except=False,
720
    checkout_revision=None,
721
    no_checkout: bool = False,
722
    clean: bool = False,
723
    clone_options: Optional[List[str]] = None,
724
) -> "Repository":
725
    """Clone a Git repository and install Git hooks and LFS.
726

727
    Args:
728
        url: The Git URL to clone.
729
        path(Union[Path, str], optional): The path to clone into (Default value = None).
730
        install_githooks: Whether to install git hooks (Default value = True).
731
        install_lfs: Whether to install Git LFS (Default value = True).
732
        skip_smudge: Whether to pull files from Git LFS (Default value = True).
733
        recursive: Whether to clone recursively (Default value = False).
734
        depth: The clone depth, number of commits from HEAD (Default value = None).
735
        progress: The GitProgress object (Default value = None).
736
        config(Optional[dict], optional): Set configuration for the project (Default value = None).
737
        raise_git_except: Whether to raise git exceptions (Default value = False).
738
        checkout_revision: The revision to check out after clone (Default value = None).
739
        no_checkout(bool, optional): Whether to perform a checkout (Default value = False).
740
        clean(bool, optional): Whether to require the target folder to be clean (Default value = False).
741
        clone_options(List[str], optional): Additional clone options (Default value = None).
742

743
    Returns:
744
        The cloned repository.
745
    """
746
    from renku.core.githooks import install_githooks as install_githooks_function
6✔
747
    from renku.infrastructure.repository import Repository
6✔
748

749
    path = Path(path) if path else Path(get_repository_name(url))
6✔
750

751
    def error_from_progress(progress: Optional[git.RemoteProgress], url: str) -> errors.GitError:
6✔
752
        """Format a Git command error into a more user-friendly format."""
753

754
        message = f"Cannot clone repo from {url}"
3✔
755

756
        if progress:
3✔
757
            lines = progress.other_lines + progress.error_lines
1✔
758
            error = "".join([f"\n\t{line}" for line in lines if line.strip()])
1✔
759
            message += f" - error message:\n {error}"
1✔
760

761
        return errors.GitError(message)
3✔
762

763
    def clean_directory(clean: bool):
6✔
764
        if not clean or not path:
6✔
765
            return
6✔
766
        try:
3✔
767
            shutil.rmtree(path)
3✔
768
        except FileNotFoundError:
2✔
769
            pass
2✔
770
        except PermissionError as e:
×
771
            raise errors.InvalidFileOperation(f"Cannot delete files in {path}: Permission denied") from e
×
772

773
    def check_and_reuse_existing_repository() -> Optional["Repository"]:
6✔
774
        if path is None or not cast(Path, path).exists():
6✔
775
            return None
5✔
776

777
        try:
6✔
778
            repository = Repository(path)
6✔
779
            remote = get_remote(repository, name="origin") or get_remote(repository)
1✔
780

781
            if remote and have_same_remote(remote.url, url):
1✔
782
                repository.reset(hard=True)
1✔
783
                repository.fetch(all=True, tags=True)
1✔
784
                # NOTE: By default we check out remote repository's HEAD since the local HEAD might not point to
785
                # the default branch.
786
                default_checkout_revision = checkout_revision or "origin/HEAD"
1✔
787
                repository.checkout(default_checkout_revision)
1✔
788
                try:
1✔
789
                    repository.pull()
1✔
790
                except errors.GitCommandError:  # NOTE: When ref is not a branch, an error is thrown
1✔
791
                    pass
1✔
792
            else:
793
                # NOTE: not same remote, so don't reuse
794
                clean_directory(clean=clean)
×
795
                return None
×
796
        except errors.GitError:  # NOTE: Not a git repository, remote not found, or checkout failed
6✔
797
            clean_directory(clean=clean)
6✔
798
        else:
799
            return repository
1✔
800

801
        return None
6✔
802

803
    def clone(branch, depth):
6✔
804
        os.environ["GIT_LFS_SKIP_SMUDGE"] = "1" if skip_smudge else "0"
6✔
805

806
        return Repository.clone_from(
6✔
807
            url,
808
            cast(Path, path),
809
            branch=branch,
810
            recursive=recursive,
811
            depth=depth,
812
            no_checkout=no_checkout,
813
            progress=progress,
814
            clone_options=clone_options,
815
        )
816

817
    assert config is None or isinstance(config, dict), f"Config should be a dict not '{type(config)}'"
6✔
818

819
    existing_repository = check_and_reuse_existing_repository()
6✔
820
    if existing_repository is not None:
6✔
821
        return existing_repository
1✔
822

823
    try:
6✔
824
        # NOTE: Try to clone, assuming checkout_revision is a branch or a tag (if it is set)
825
        repository = clone(branch=checkout_revision, depth=depth)
6✔
826
    except errors.GitCommandError:
3✔
827
        if not checkout_revision:
3✔
828
            if raise_git_except:
3✔
829
                raise
1✔
830
            raise error_from_progress(progress, url)
3✔
831

832
        # NOTE: Delete the partially-cloned repository
833
        clean_directory(clean=True)
3✔
834

835
        # NOTE: clone without branch set, in case checkout_revision was not a branch or a tag but a commit
836
        try:
3✔
837
            repository = clone(branch=None, depth=None)
3✔
838
        except errors.GitCommandError:
3✔
839
            if raise_git_except:
3✔
840
                raise
×
841
            raise error_from_progress(progress, url)
3✔
842

843
    if checkout_revision is not None and not no_checkout:
5✔
844
        try:
5✔
845
            repository.checkout(checkout_revision)
5✔
846
        except errors.GitCommandError:
2✔
847
            raise errors.GitReferenceNotFoundError(
2✔
848
                f"Cannot checkout reference '{checkout_revision}' in repository: {url}"
849
            )
850

851
    if config:
5✔
852
        with repository.get_configuration(writable=True) as config_writer:
3✔
853
            for key, value in config.items():
3✔
854
                try:
3✔
855
                    section, option = key.rsplit(".", maxsplit=1)
3✔
856
                except ValueError:
×
857
                    raise errors.GitError(f"Cannot write to config: Invalid config '{key}'")
×
858

859
                config_writer.set_value(section, option, value)
3✔
860

861
    if install_githooks:
5✔
862
        install_githooks_function(force=True, path=repository.path)
5✔
863

864
    if install_lfs:
5✔
865
        repository.lfs.install(skip_smudge=skip_smudge)
5✔
866

867
    return repository
5✔
868

869

870
def get_git_progress_instance():
8✔
871
    """Return a GitProgress object."""
872
    from git.remote import RemoteProgress
3✔
873

874
    class GitProgress(RemoteProgress):
3✔
875
        """Progress printing for GitPython."""
876

877
        def __init__(self):
3✔
878
            """Initialize a Git progress printer."""
879
            super().__init__()
3✔
880
            self._previous_line_length = 0
3✔
881

882
        def update(self, op_code, cur_count, max_count=None, message=""):
3✔
883
            """Callback for printing Git operation status."""
884
            self._clear_line()
1✔
885
            print(self._cur_line, end="\r")
1✔
886
            self._previous_line_length = len(self._cur_line) if self._cur_line else 0
1✔
887
            if (op_code & RemoteProgress.END) != 0:
1✔
888
                print()
1✔
889

890
        def _clear_line(self):
3✔
891
            print(self._previous_line_length * " ", end="\r")
1✔
892

893
    return GitProgress()
3✔
894

895

896
def get_file_size(repository_path: Path, path: str) -> Optional[int]:
8✔
897
    """Return file size for a file inside a git repository."""
898
    # NOTE: First try to get file size from Git LFS
899
    try:
1✔
900
        lfs_run = run(
1✔
901
            ("git", "lfs", "ls-files", "--name-only", "--size"),
902
            stdout=PIPE,
903
            cwd=repository_path,
904
            text=True,
905
        )
906
    except SubprocessError:
×
907
        pass
×
908
    else:
909
        lfs_output = lfs_run.stdout.split("\n")
1✔
910
        # Example line format: relative/path/to/file (7.9 MB)
911
        pattern = re.compile(r".*\((.*)\)")
1✔
912
        for line in lfs_output:
1✔
913
            if path not in line:
1✔
914
                continue
1✔
915
            match = pattern.search(line)
1✔
916
            if not match:
1✔
917
                continue
×
918
            size_info = match.groups()[0].split()
1✔
919
            if len(size_info) != 2:
1✔
920
                continue
×
921
            try:
1✔
922
                size = float(size_info[0])
1✔
923
            except ValueError:
×
924
                continue
×
925
            unit = size_info[1].strip().lower()
1✔
926
            conversions = {"b": 1, "kb": 1e3, "mb": 1e6, "gb": 1e9}
1✔
927
            multiplier = conversions.get(unit, None)
1✔
928
            if multiplier is None:
1✔
929
                continue
×
930
            return int(size * multiplier)
1✔
931

932
    # Return size of the file on disk
933
    full_path = repository_path / path
×
934
    return os.path.getsize(full_path) if full_path.exists() else None
×
935

936

937
def shorten_message(message: str, line_length: int = 100, body_length: int = 65000) -> str:
8✔
938
    """Wraps and shortens a commit message.
939

940
    Args:
941
        message(str): message to adjust.
942
        line_length(int, optional): maximum line length before wrapping. 0 for infinite (Default value = 100).
943
        body_length(int, optional): maximum body length before cut. 0 for infinite (Default value = 65000).
944
    Raises:
945
        ParameterError: If line_length or body_length < 0
946
    Returns:
947
        message wrapped and trimmed.
948

949
    """
950
    if line_length < 0:
8✔
951
        raise errors.ParameterError("the length can't be negative.", "line_length")
1✔
952

953
    if body_length < 0:
8✔
954
        raise errors.ParameterError("the length can't be negative.", "body_length")
1✔
955

956
    if body_length and len(message) > body_length:
8✔
957
        message = message[: body_length - 3] + "..."
1✔
958

959
    if line_length == 0 or len(message) <= line_length:
8✔
960
        return message
8✔
961

962
    lines = message.split(" ")
5✔
963
    lines = [
5✔
964
        line
965
        if len(line) < line_length
966
        else "\n\t".join(line[o : o + line_length] for o in range(0, len(line), line_length))
967
        for line in lines
968
    ]
969

970
    # NOTE: tries to preserve message spacing.
971
    wrapped_message = reduce(
5✔
972
        lambda c, x: (f"{c[0]} {x}", c[1] + len(x) + 1)
973
        if c[1] + len(x) <= line_length
974
        else (f"{c[0]}\n\t" + x, len(x)),
975
        lines,
976
        ("", 0),
977
    )[0]
978
    return wrapped_message[1:]
5✔
979

980

981
def get_in_submodules(
8✔
982
    repository: "Repository", commit: "Commit", path: Union[Path, str]
983
) -> Tuple["Repository", "Commit", Path]:
984
    """Resolve filename in submodules."""
985
    original_path = repository.path / path
6✔
986
    in_vendor = str(path).startswith(".renku/vendors")
6✔
987

988
    if original_path.is_symlink() or in_vendor:
6✔
989
        resolved_path = original_path.resolve()
1✔
990

991
        for submodule in repository.submodules:  # type: ignore
1✔
992
            if not (submodule.path / ".git").exists():
1✔
993
                continue
×
994

995
            try:
1✔
996
                path_within_submodule = resolved_path.relative_to(submodule.path)
1✔
997
                commit = submodule.get_previous_commit(path=path_within_submodule, revision=commit.hexsha)
1✔
998
            except (ValueError, errors.GitCommitNotFoundError):
1✔
999
                pass
1✔
1000
            else:
1001
                return submodule, commit, path_within_submodule
×
1002

1003
    return repository, commit, Path(path)
6✔
1004

1005

1006
def get_dirty_paths(repository: "Repository") -> Set[str]:
8✔
1007
    """Get paths of dirty files in the repository."""
1008
    modified_files = [item.b_path for item in repository.unstaged_changes if item.b_path]
7✔
1009
    staged_files = [d.a_path for d in repository.staged_changes] if repository.head.is_valid() else []
7✔
1010

1011
    return {os.path.join(repository.path, p) for p in repository.untracked_files + modified_files + staged_files}
7✔
1012

1013

1014
@contextlib.contextmanager
8✔
1015
def with_commit(
8✔
1016
    *,
1017
    repository: "Repository",
1018
    transaction_id: str,
1019
    commit_only=None,
1020
    commit_empty=True,
1021
    raise_if_empty=False,
1022
    commit_message=None,
1023
    abbreviate_message=True,
1024
    skip_dirty_checks=False,
1025
):
1026
    """Automatic commit."""
1027
    diff_before = prepare_commit(repository=repository, commit_only=commit_only, skip_dirty_checks=skip_dirty_checks)
8✔
1028

1029
    yield
8✔
1030

1031
    finalize_commit(
8✔
1032
        diff_before=diff_before,
1033
        repository=repository,
1034
        transaction_id=transaction_id,
1035
        commit_only=commit_only,
1036
        commit_empty=commit_empty,
1037
        raise_if_empty=raise_if_empty,
1038
        commit_message=commit_message,
1039
        abbreviate_message=abbreviate_message,
1040
    )
1041

1042

1043
def prepare_commit(*, repository: "Repository", commit_only=None, skip_dirty_checks=False, skip_staging: bool = False):
8✔
1044
    """Gather information about repo needed for committing later on."""
1045

1046
    def ensure_not_untracked(path):
8✔
1047
        """Ensure that path is not part of git untracked files."""
1048
        for file_path in repository.untracked_files:
6✔
1049
            is_parent = (repository.path / file_path).parent == (repository.path / path)
4✔
1050
            is_equal = str(path) == file_path
4✔
1051

1052
            if is_parent or is_equal:
4✔
1053
                raise errors.DirtyRenkuDirectory(repository)
2✔
1054

1055
    def ensure_not_staged(path):
8✔
1056
        """Ensure that path is not part of git staged files."""
1057
        path = str(path)
6✔
1058
        for file_path in repository.staged_changes:
6✔
1059
            is_parent = str(file_path.a_path).startswith(path)
2✔
1060
            is_equal = path == file_path.a_path
2✔
1061

1062
            if is_parent or is_equal:
2✔
1063
                raise errors.DirtyRenkuDirectory(repository)
2✔
1064

1065
    if skip_staging:
8✔
1066
        if not isinstance(commit_only, list) or len(commit_only) == 0:
3✔
1067
            raise errors.OperationError("Cannot use ``skip_staging`` without specifying files to commit.")
×
1068

1069
    diff_before = set()
8✔
1070

1071
    if commit_only == COMMIT_DIFF_STRATEGY:
8✔
1072
        if len(repository.staged_changes) > 0 or len(repository.unstaged_changes) > 0:
×
1073
            repository.reset()
×
1074

1075
        # Exclude files created by pipes.
1076
        diff_before = {
×
1077
            file for file in repository.untracked_files if STARTED_AT - int(Path(file).stat().st_ctime * 1e3) >= 1e3
1078
        }
1079

1080
    if isinstance(commit_only, list) and not skip_dirty_checks:
8✔
1081
        for path in commit_only:
6✔
1082
            ensure_not_untracked(path)
6✔
1083
            ensure_not_staged(path)
6✔
1084

1085
    return diff_before
8✔
1086

1087

1088
def finalize_commit(
8✔
1089
    *,
1090
    diff_before,
1091
    repository: "Repository",
1092
    transaction_id: str,
1093
    commit_only=None,
1094
    commit_empty=True,
1095
    raise_if_empty=False,
1096
    commit_message=None,
1097
    abbreviate_message=True,
1098
    skip_staging: bool = False,
1099
):
1100
    """Commit modified/added paths."""
1101
    from renku.core.util.urls import remove_credentials
8✔
1102
    from renku.infrastructure.repository import Actor
8✔
1103
    from renku.version import __version__, version_url
8✔
1104

1105
    committer = Actor(name=f"renku {__version__}", email=version_url)
8✔
1106

1107
    change_types = {item.a_path: item.change_type for item in repository.unstaged_changes}
8✔
1108

1109
    if commit_only == COMMIT_DIFF_STRATEGY:
8✔
1110
        # Get diff generated in command.
1111
        staged_after = set(change_types.keys())
×
1112

1113
        modified_after_change_types = {item.a_path: item.change_type for item in repository.staged_changes}
×
1114

1115
        modified_after = set(modified_after_change_types.keys())
×
1116

1117
        change_types.update(modified_after_change_types)
×
1118

1119
        diff_after = set(repository.untracked_files).union(staged_after).union(modified_after)
×
1120

1121
        # Remove files not touched in command.
1122
        commit_only = list(diff_after - diff_before)
×
1123

1124
    if isinstance(commit_only, list):
8✔
1125
        for path_ in commit_only:
8✔
1126
            p = repository.path / path_
8✔
1127
            if p.exists() or change_types.get(str(path_)) == DiffChangeType.DELETED:
8✔
1128
                repository.add(path_)
8✔
1129

1130
    if not commit_only:
8✔
1131
        repository.add(all=True)
7✔
1132

1133
    try:
8✔
1134
        diffs = [d.a_path for d in repository.staged_changes]
8✔
1135
    except errors.GitError:
8✔
1136
        diffs = []
8✔
1137

1138
    if not commit_empty and not diffs:
8✔
1139
        if raise_if_empty:
5✔
1140
            raise errors.NothingToCommit()
2✔
1141
        return
5✔
1142

1143
    if commit_message and not isinstance(commit_message, str):
8✔
1144
        raise errors.CommitMessageEmpty()
×
1145

1146
    elif not commit_message:
8✔
1147
        argv = [os.path.basename(sys.argv[0])] + [remove_credentials(arg) for arg in sys.argv[1:]]
7✔
1148

1149
        commit_message = " ".join(argv)
7✔
1150

1151
    if abbreviate_message:
8✔
1152
        commit_message = shorten_message(commit_message)
8✔
1153

1154
    # NOTE: Only commit specified paths when skipping staging area
1155
    paths = commit_only if skip_staging else []
8✔
1156
    # Ignore pre-commit hooks since we have already done everything.
1157
    repository.commit(commit_message + transaction_id, committer=committer, no_verify=True, paths=paths)
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc