• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 7790158199

05 Feb 2024 08:01PM UTC coverage: 85.162% (+0.01%) from 85.151%
7790158199

Pull #3690

github

web-flow
Merge 23fad85cd into 33a29aa3f
Pull Request #3690: fix(core): migrate Dockerfile after metadata

27 of 29 new or added lines in 3 files covered. (93.1%)

5 existing lines in 4 files now uncovered.

26659 of 31304 relevant lines covered (85.16%)

3.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.7
/renku/command/rollback.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Renku ``status`` command."""
1✔
17

18
import os.path
1✔
19
import re
1✔
20
from datetime import datetime
1✔
21
from itertools import islice
1✔
22
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Tuple, cast
1✔
23

24
from renku.command.command_builder.command import Command
1✔
25
from renku.core.util import communication
1✔
26
from renku.domain_model.dataset import Dataset
1✔
27
from renku.domain_model.project_context import project_context
1✔
28
from renku.domain_model.provenance.activity import Activity
1✔
29
from renku.domain_model.workflow.plan import AbstractPlan
1✔
30

31
if TYPE_CHECKING:
1✔
32
    from renku.infrastructure.repository import Commit
×
33

34
CHECKPOINTS_PER_PAGE = 50
1✔
35

36

37
def rollback_command():
1✔
38
    """Command to perform a rollback fo the repo."""
39
    return Command().command(_rollback_command).require_clean().require_migration().with_database()
1✔
40

41

42
def _rollback_command():
1✔
43
    """Perform a rollback of the repo."""
44
    commits: Generator["Commit", None, None] = project_context.repository.iterate_commits(project_context.metadata_path)
1✔
45

46
    checkpoint = _prompt_for_checkpoint(commits)
1✔
47

48
    if not checkpoint:
1✔
49
        return
1✔
50

51
    diff = checkpoint[1].get_changes(commit="HEAD")
1✔
52

53
    confirmation_message, has_changes = _get_confirmation_message(diff)
1✔
54

55
    if not has_changes:
1✔
56
        communication.echo("There would be no changes rolling back to the selected command, exiting.")
×
57
        return
×
58

59
    communication.confirm(confirmation_message, abort=True)
1✔
60

61
    project_context.repository.reset(checkpoint[1], hard=True)
1✔
62

63

64
def _get_confirmation_message(diff) -> Tuple[str, bool]:
1✔
65
    """Create a confirmation message for changes that would be done by a rollback.
66

67
    Args:
68
        diff: Diff between two commits.
69

70
    Returns:
71
        Tuple[str, bool]: Tuple of confirmation message and if there would be changes.
72
    """
73
    modifications = _get_modifications_from_diff(diff)
1✔
74

75
    has_changes = False
1✔
76

77
    confirmation_message = "The following changes would be done:\n\nMetadata:\n"
1✔
78

79
    if modifications["metadata"]["restored"]:
1✔
80
        confirmation_message += "\nRestored ↻:\n\t" + "\n\t".join(modifications["metadata"]["restored"]) + "\n"
×
81
        has_changes = True
×
82

83
    if modifications["metadata"]["modified"]:
1✔
UNCOV
84
        confirmation_message += "\nModified ♻️:\n\t" + "\n\t".join(modifications["metadata"]["modified"]) + "\n"
×
UNCOV
85
        has_changes = True
×
86

87
    if modifications["metadata"]["removed"]:
1✔
88
        confirmation_message += "\nRemoved 🔥:\n\t" + "\n\t".join(modifications["metadata"]["removed"]) + "\n"
1✔
89
        has_changes = True
1✔
90

91
    confirmation_message += "\nFiles:\n"
1✔
92

93
    if modifications["files"]["restored"]:
1✔
94
        confirmation_message += "\nRestored ↻:\n\t" + "\n\t".join(modifications["files"]["restored"]) + "\n"
×
95
        has_changes = True
×
96

97
    if modifications["files"]["modified"]:
1✔
98
        confirmation_message += "\nModified ♻️:\n\t" + "\n\t".join(modifications["files"]["modified"]) + "\n"
1✔
99
        has_changes = True
1✔
100

101
    if modifications["files"]["removed"]:
1✔
102
        confirmation_message += "\nRemoved 🔥:\n\t" + "\n\t".join(modifications["files"]["removed"]) + "\n"
1✔
103
        has_changes = True
1✔
104

105
    confirmation_message += "\nProceed?"
1✔
106

107
    return confirmation_message, has_changes
1✔
108

109

110
def _get_modifications_from_diff(diff):
1✔
111
    """Get all modifications from a diff.
112

113
    Args:
114
        diff: Diff between two commits.
115

116
    Returns:
117
        List of metadata modifications made in diff.
118
    """
119
    modifications: Dict[str, Dict[str, List[str]]] = {
1✔
120
        "metadata": {"restored": [], "modified": [], "removed": []},
121
        "files": {"restored": [], "modified": [], "removed": []},
122
    }
123

124
    metadata_objects: Dict[str, Tuple[str, str, datetime]] = {}
1✔
125

126
    for diff_index in diff:
1✔
127
        entry = diff_index.a_path or diff_index.b_path
1✔
128
        entry_path = project_context.path / entry
1✔
129

130
        if str(project_context.database_path) == os.path.commonpath([project_context.database_path, entry_path]):
1✔
131
            # metadata file
132
            modification_type = _get_modification_type_from_db(entry)
1✔
133

134
            if not modification_type:
1✔
135
                continue
1✔
136

137
            entry, change_type, identifier, entry_date = modification_type
1✔
138

139
            if identifier not in metadata_objects or entry_date < metadata_objects[identifier][2]:
1✔
140
                # we only want he least recent change of a metadata object
141
                metadata_objects[identifier] = (entry, change_type, entry_date)
1✔
142

143
            continue
1✔
144

145
        elif str(project_context.metadata_path) == os.path.commonpath([project_context.metadata_path, entry_path]):
1✔
146
            # some other renku file
147
            continue
×
148

149
        # normal file
150
        if diff_index.added:
1✔
151
            modifications["files"]["removed"].append(entry)
1✔
152

153
        elif diff_index.deleted:
1✔
154
            modifications["files"]["restored"].append(entry)
×
155
        else:
156
            modifications["files"]["modified"].append(entry)
1✔
157

158
    for entry, change_type, _ in metadata_objects.values():
1✔
159
        modifications["metadata"][change_type].append(entry)
1✔
160

161
    return modifications
1✔
162

163

164
def _prompt_for_checkpoint(commits):
1✔
165
    """Ask to select a checkpoint to rollback to.
166

167
    Args:
168
        commits: Commits a user can choose from.
169

170
    Returns:
171
        Commit chosen by user.
172
    """
173
    checkpoint_iterator = _checkpoint_iterator(commits)
1✔
174

175
    all_checkpoints = []
1✔
176
    current_index = 0
1✔
177
    selected = None
1✔
178
    selection = None
1✔
179

180
    communication.echo("Select a checkpoint to roll back to:\n")
1✔
181

182
    # prompt user to select a checkpoint in batches
183
    while True:
1✔
184
        batch = list(islice(checkpoint_iterator, CHECKPOINTS_PER_PAGE))
1✔
185
        more_pages = len(batch) == CHECKPOINTS_PER_PAGE
1✔
186
        if batch:
1✔
187
            all_checkpoints.extend(batch)
1✔
188
            prompt = "\n".join(f"[{i}] {entry[0]}" for i, entry in enumerate(batch, current_index))
1✔
189
            prompt += "\nCheckpoint ([q] to quit"
1✔
190
            default = "q"
1✔
191
            if more_pages:
1✔
192
                prompt += ", [m] for more)"
×
193
                default = "m"
×
194
            else:
195
                prompt += ")"
1✔
196

197
            selection = communication.prompt(prompt, default=default)
1✔
198
        else:
199
            communication.echo("No more checkpoints.")
×
200

201
        while True:
1✔
202
            # loop until user makes a valid selection
203
            invalid = False
1✔
204
            if selection == "m" and more_pages:
1✔
205
                current_index += CHECKPOINTS_PER_PAGE
×
206
                break
×
207
            elif selection == "q":
1✔
208
                return
1✔
209
            elif selection is None:
1✔
210
                invalid = True
×
211
            else:
212
                try:
1✔
213
                    selected = int(selection)
1✔
214
                    if 0 <= selected < len(all_checkpoints):
1✔
215
                        break
1✔
216
                    else:
217
                        communication.warn("Not a valid checkpoint")
×
218
                        selected = None
×
219
                except (ValueError, TypeError):
×
220
                    invalid = True
×
221

222
            if invalid:
×
223
                communication.warn(
×
224
                    "Please enter a valid checkpoint number" + (", 'q' or 'm'" if more_pages else "or 'q'")
225
                )
226

227
            prompt = "Checkpoint ([q] to quit)"
×
228
            if more_pages:
×
229
                prompt += ", [m] for more)"
×
230
            else:
231
                prompt += ")"
×
232
            selection = communication.prompt("Checkpoint ([q] to quit)", default="q")
×
233

234
        if selected is not None:
1✔
235
            break
1✔
236

237
    if not all_checkpoints:
1✔
238
        communication.echo("No valid renku commands in project to roll back to.")
×
239
        return
×
240

241
    return all_checkpoints[selected]
1✔
242

243

244
def _get_modification_type_from_db(path: str) -> Optional[Tuple[str, str, str, datetime]]:
1✔
245
    """Get the modification type for an entry in the database.
246

247
    Args:
248
        path(str): Path to database object.
249

250
    Returns:
251
        Change information for object.
252
    """
253
    database = project_context.database
1✔
254
    db_object = database.get(os.path.basename(path))
1✔
255

256
    if isinstance(db_object, Activity):
1✔
257
        return (
1✔
258
            f"Run: {db_object.id} (Plan name: {db_object.association.plan.name})",
259
            "removed",
260
            db_object.id,
261
            db_object.ended_at_time,
262
        )
263
    elif isinstance(db_object, AbstractPlan):
1✔
264
        change_type = "removed"
1✔
265

266
        if db_object.derived_from:
1✔
267
            derived = database.get_by_id(db_object.derived_from)
×
268
            if db_object.name == derived.name:
×
269
                change_type = "modified"
×
270
        if db_object.date_removed:
1✔
271
            change_type = "restored"
×
272

273
        return (
1✔
274
            f"Plan: {db_object.name}",
275
            change_type,
276
            f"plan_{db_object.name}",
277
            db_object.date_removed or db_object.date_created,
278
        )
279
    elif isinstance(db_object, Dataset):
1✔
280
        change_type = "removed"
1✔
281

282
        if db_object.derived_from:
1✔
283
            change_type = "modified"
1✔
284
        if db_object.date_removed:
1✔
285
            change_type = "restored"
×
286

287
        return (
1✔
288
            f"Dataset: {db_object.name}",
289
            change_type,
290
            f"dataset_{db_object.name}",
291
            cast(datetime, db_object.date_removed or db_object.date_published or db_object.date_created),
292
        )
293
    else:
294
        return None
1✔
295

296

297
def _checkpoint_iterator(commits):
1✔
298
    """Iterate through commits to create checkpoints.
299

300
    Args:
301
        commits: Commits to iterate through.
302

303
    Returns:
304
        Iterator of commits that can be a checkpoint.
305
    """
306
    transaction_pattern = re.compile(r"\n\nrenku-transaction:\s([0-9a-g]+)$")
1✔
307

308
    current_checkpoint: Optional[Tuple[str, "Commit", str]] = None
1✔
309

310
    for commit in commits:
1✔
311
        commit_message = commit.message
1✔
312
        match = transaction_pattern.search(commit_message)
1✔
313

314
        if not match:
1✔
315
            continue
×
316

317
        transaction_id = match.group(0)
1✔
318
        entry = (
1✔
319
            f"{commit.authored_datetime:%Y-%m-%d %H:%M:%S} " f"\t{commit.hexsha[:7]}\t{commit_message.splitlines()[0]}",
320
            commit,
321
            transaction_id,
322
        )
323

324
        if not current_checkpoint:
1✔
325
            current_checkpoint = entry
1✔
326
            continue
1✔
327

328
        if transaction_id != current_checkpoint[2]:
1✔
329
            yield current_checkpoint
1✔
330
            current_checkpoint = entry
1✔
331

332
    if current_checkpoint:
1✔
333
        yield current_checkpoint
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc