• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 6875247711

15 Nov 2023 09:16AM UTC coverage: 82.786% (-0.05%) from 82.831%
6875247711

Pull #3300

github

web-flow
Merge e2d3269e8 into 4726f660e
Pull Request #3300: chore: do not always retry load tests requests

25441 of 30731 relevant lines covered (82.79%)

3.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.01
/renku/core/dataset/dataset.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Dataset business logic."""
7✔
17

18
import imghdr
7✔
19
import os
7✔
20
import shutil
7✔
21
import urllib
7✔
22
from collections import defaultdict
7✔
23
from pathlib import Path
7✔
24
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
7✔
25

26
import patoolib
7✔
27
from pydantic import validate_arguments
7✔
28

29
from renku.command.command_builder.command import inject
7✔
30
from renku.command.view_model.dataset import DatasetFileViewModel, DatasetViewModel
7✔
31
from renku.core import errors
7✔
32
from renku.core.config import get_value, remove_value, set_value
7✔
33
from renku.core.dataset.datasets_provenance import DatasetsProvenance
7✔
34
from renku.core.dataset.pointer_file import delete_external_file, is_linked_file_updated, update_linked_file
7✔
35
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi
7✔
36
from renku.core.dataset.providers.factory import ProviderFactory
7✔
37
from renku.core.dataset.providers.git import GitProvider
7✔
38
from renku.core.dataset.providers.models import DatasetUpdateAction, ProviderDataset
7✔
39
from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection
7✔
40
from renku.core.image import ImageObjectRequest
7✔
41
from renku.core.interface.dataset_gateway import IDatasetGateway
7✔
42
from renku.core.storage import check_external_storage, track_paths_in_storage
7✔
43
from renku.core.util import communication
7✔
44
from renku.core.util.datetime8601 import local_now
7✔
45
from renku.core.util.git import get_git_user
7✔
46
from renku.core.util.metadata import prompt_for_credentials, read_credentials, store_credentials
7✔
47
from renku.core.util.os import (
7✔
48
    create_symlink,
49
    delete_dataset_file,
50
    delete_path,
51
    get_absolute_path,
52
    get_file_size,
53
    get_files,
54
    get_relative_path,
55
    get_safe_relative_path,
56
    hash_file,
57
    is_path_empty,
58
    is_subpath,
59
    unmount_path,
60
)
61
from renku.core.util.tabulate import tabulate
7✔
62
from renku.core.util.urls import get_slug
7✔
63
from renku.core.util.util import parallel_execute
7✔
64
from renku.domain_model.constant import NO_VALUE, NON_EXISTING_ENTITY_CHECKSUM, NoValueType
7✔
65
from renku.domain_model.dataset import Dataset, DatasetDetailsJson, DatasetFile, RemoteEntity, is_dataset_slug_valid
7✔
66
from renku.domain_model.entity import Entity
7✔
67
from renku.domain_model.enums import ConfigFilter
7✔
68
from renku.domain_model.project_context import project_context
7✔
69
from renku.domain_model.provenance.agent import Person
7✔
70
from renku.domain_model.provenance.annotation import Annotation
7✔
71
from renku.infrastructure.immutable import DynamicProxy
7✔
72

73
if TYPE_CHECKING:
7✔
74
    from renku.core.interface.storage import IStorage
×
75

76

77
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
78
def search_datasets(slug: str) -> List[str]:
7✔
79
    """Get all the datasets whose slug starts with the given string.
80

81
    Args:
82
        slug(str): Beginning of dataset slug to search for.
83

84
    Returns:
85
        List[str]: List of found dataset slugs.
86
    """
87
    datasets_provenance = DatasetsProvenance()
×
88
    return list(filter(lambda x: x.startswith(slug), map(lambda x: x.slug, datasets_provenance.datasets)))
×
89

90

91
def list_datasets():
7✔
92
    """List all datasets."""
93
    datasets_provenance = DatasetsProvenance()
3✔
94
    datasets = []
3✔
95

96
    for dataset in datasets_provenance.datasets:
3✔
97
        tags = datasets_provenance.get_all_tags(dataset)
3✔
98
        dataset = cast(Dataset, DynamicProxy(dataset))
3✔
99
        dataset.tags = tags
3✔
100
        dataset.tags_csv = ",".join(tag.name for tag in tags)
3✔
101
        dataset.datadir_path = str(dataset.get_datadir())
3✔
102
        datasets.append(dataset)
3✔
103

104
    return list(datasets)
3✔
105

106

107
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
108
def create_dataset(
7✔
109
    slug: str,
110
    name: Optional[str] = None,
111
    description: Optional[str] = None,
112
    creators: Optional[List[Person]] = None,
113
    keywords: Optional[List[str]] = None,
114
    images: Optional[List[ImageObjectRequest]] = None,
115
    update_provenance: bool = True,
116
    custom_metadata: Optional[Dict[str, Any]] = None,
117
    storage: Optional[str] = None,
118
    datadir: Optional[Path] = None,
119
):
120
    """Create a dataset.
121

122
    Args:
123
        slug(str): Slug of the dataset
124
        name(Optional[str], optional): Dataset name (Default value = None).
125
        description(Optional[str], optional): Dataset description (Default value = None).
126
        creators(Optional[List[Person]], optional): Dataset creators (Default value = None).
127
        keywords(Optional[List[str]], optional): Dataset keywords (Default value = None).
128
        images(Optional[List[ImageObjectRequest]], optional): Dataset images (Default value = None).
129
        update_provenance(bool, optional): Whether to add this dataset to dataset provenance
130
            (Default value = True).
131
        custom_metadata(Optional[Dict[str, Any]], optional): Custom JSON-LD metadata (Default value = None).
132
        storage(Optional[str], optional): Backend storage's URI (Default value = None).
133
        datadir(Optional[Path]): Dataset's data directory (Default value = None).
134

135
    Returns:
136
        Dataset: The created dataset.
137
    """
138
    if not creators:
6✔
139
        creators = []
6✔
140
        user = get_git_user(repository=project_context.repository)
6✔
141

142
        if user:
6✔
143
            creators.append(user)
6✔
144

145
    if not is_dataset_slug_valid(slug):
6✔
146
        valid_slug = get_slug(slug, lowercase=False)
1✔
147
        raise errors.ParameterError(f"Dataset slug '{slug}' is not valid (Hint: '{valid_slug}' is valid).")
1✔
148

149
    datasets_provenance = DatasetsProvenance()
6✔
150

151
    if datasets_provenance.get_by_slug(slug=slug):
6✔
152
        raise errors.DatasetExistsError(slug)
1✔
153

154
    if not name:
6✔
155
        name = slug
6✔
156

157
    keywords = keywords or []
6✔
158

159
    annotations = (
6✔
160
        [Annotation(id=Annotation.generate_id(), source="renku", body=custom_metadata)] if custom_metadata else None
161
    )
162

163
    if datadir:
6✔
164
        try:
2✔
165
            datadir = get_safe_relative_path(datadir, project_context.path)
2✔
166
        except ValueError as e:
×
167
            raise errors.ParameterError("Datadir must be inside repository.") from e
×
168

169
    dataset = Dataset(
6✔
170
        identifier=None,
171
        slug=slug,
172
        name=name,
173
        description=description,
174
        creators=creators,
175
        keywords=keywords,
176
        project_id=project_context.project.id,
177
        annotations=annotations,
178
        storage=storage,
179
        datadir=datadir,
180
    )
181

182
    if images:
6✔
183
        set_dataset_images(dataset=dataset, images=images)
×
184

185
    if storage:
6✔
186
        provider = ProviderFactory.get_create_provider(uri=storage)
2✔
187
        provider.on_create(dataset=dataset)
2✔
188
    else:
189
        add_datadir_files_to_dataset(dataset)
6✔
190

191
    if update_provenance:
6✔
192
        datasets_provenance.add_or_update(dataset)
6✔
193

194
    return dataset
6✔
195

196

197
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
198
def edit_dataset(
7✔
199
    slug: str,
200
    name: Optional[Union[str, NoValueType]],
201
    description: Optional[Union[str, NoValueType]],
202
    creators: Optional[Union[List[Person], NoValueType]],
203
    keywords: Optional[Union[List[str], NoValueType]] = NO_VALUE,
204
    images: Optional[Union[List[ImageObjectRequest], NoValueType]] = NO_VALUE,
205
    custom_metadata: Optional[Union[Dict, List[Dict], NoValueType]] = NO_VALUE,
206
    custom_metadata_source: Optional[Union[str, NoValueType]] = NO_VALUE,
207
):
208
    """Edit dataset metadata.
209

210
    Args:
211
        slug(str): Slug of the dataset to edit
212
        name(Optional[Union[str, NoValueType]]): New name for the dataset.
213
        description(Optional[Union[str, NoValueType]]): New description for the dataset.
214
        creators(Optional[Union[List[Person], NoValueType]]): New creators for the dataset.
215
        keywords(Optional[Union[List[str], NoValueType]]): New keywords for dataset (Default value = ``NO_VALUE``).
216
        images(Optional[Union[List[ImageObjectRequest], NoValueType]]): New images for dataset
217
            (Default value = ``NO_VALUE``).
218
        custom_metadata(Optional[Union[Dict, List[Dict], NoValueType]]): Custom JSON-LD metadata
219
            (Default value = ``NO_VALUE``).
220
        custom_metadata_source(Optional[Union[str, NoValueType]]): The custom metadata source
221
            (Default value = ``NO_VALUE``).
222

223
    Returns:
224
        bool: True if updates were performed.
225
    """
226
    if isinstance(name, str):
2✔
227
        name = name.strip()
1✔
228

229
    if name is None:
2✔
230
        name = ""
×
231

232
    possible_updates = {
2✔
233
        "creators": creators,
234
        "description": description,
235
        "keywords": keywords,
236
        "name": name,
237
    }
238

239
    dataset_provenance = DatasetsProvenance()
2✔
240
    dataset = dataset_provenance.get_by_slug(slug=slug)
2✔
241

242
    if dataset is None:
2✔
243
        raise errors.ParameterError("Dataset does not exist.")
×
244

245
    updated: Dict[str, Any] = {k: v for k, v in possible_updates.items() if v != NO_VALUE}
2✔
246

247
    if updated:
2✔
248
        dataset.update_metadata(creators=creators, description=description, keywords=keywords, name=name)
2✔
249

250
    if images == NO_VALUE:
2✔
251
        images_updated = False
2✔
252
    else:
253
        images_updated = set_dataset_images(dataset=dataset, images=cast(Optional[List[ImageObjectRequest]], images))
×
254

255
    if images_updated:
2✔
256
        updated["images"] = (
×
257
            None if images is None else [{"content_url": i.content_url, "position": i.position} for i in dataset.images]
258
        )
259

260
    if custom_metadata is not NO_VALUE:
2✔
261
        update_dataset_custom_metadata(
1✔
262
            dataset,
263
            cast(Optional[Union[Dict, List[Dict]]], custom_metadata),
264
            cast(Optional[str], custom_metadata_source),
265
        )
266
        updated["custom_metadata"] = custom_metadata
1✔
267

268
    if not updated:
2✔
269
        return []
1✔
270

271
    datasets_provenance = DatasetsProvenance()
2✔
272
    datasets_provenance.add_or_update(dataset, creator=get_git_user(project_context.repository))
2✔
273

274
    return updated
2✔
275

276

277
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
278
def list_dataset_files(
7✔
279
    datasets: Optional[List[str]] = None,
280
    tag: Optional[str] = None,
281
    creators: Optional[Union[str, List[str], Tuple[str]]] = None,
282
    include: Optional[List[str]] = None,
283
    exclude: Optional[List[str]] = None,
284
):
285
    """List dataset files.
286

287
    Args:
288
        datasets(Optional[List[str]]): Datasets to list files for (Default value = None).
289
        tag(str): Tag to filter by (Default value = None).
290
        creators(Optional[Union[str, List[str], Tuple[str]]]): Creators to filter by (Default value = None).
291
        include(Optional[List[str]]): Include filters for file paths (Default value = None).
292
        exclude(Optional[List[str]]): Exclude filters for file paths (Default value = None).
293

294
    Returns:
295
        List[DynamicProxy]: Filtered dataset files.
296
    """
297
    from renku.command.format.dataset_files import get_lfs_tracking_and_file_sizes
4✔
298

299
    records = filter_dataset_files(
4✔
300
        slugs=datasets, tag=tag, creators=creators, include=include, exclude=exclude, immutable=True
301
    )
302
    for record in records:
4✔
303
        record.title = record.dataset.name
4✔
304
        record.dataset_slug = record.dataset.slug
4✔
305
        record.dataset_id = record.dataset.id
4✔
306
        record.creators_csv = record.dataset.creators_csv
4✔
307
        record.creators_full_csv = record.dataset.creators_full_csv
4✔
308
        record.full_path = project_context.path / record.entity.path
4✔
309
        record.path = record.entity.path
4✔
310
        record.name = Path(record.entity.path).name
4✔
311
        record.added = record.date_added
4✔
312

313
    get_lfs_tracking_and_file_sizes(records, has_tag=bool(tag))
4✔
314

315
    return records
4✔
316

317

318
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
319
def file_unlink(
7✔
320
    slug: str,
321
    include: Optional[List[str]] = None,
322
    exclude: Optional[List[str]] = None,
323
    yes: bool = False,
324
    dataset_files: Optional[List[DatasetFile]] = None,
325
):
326
    """Remove matching files from a dataset.
327

328
    Args:
329
        slug(str): Dataset slug.
330
        include(Optional[List[str]]): Include filter for files (Default value = None).
331
        exclude(Optional[List[str]]): Exclude filter for files (Default value = None).
332
        yes(bool): Whether to skip user confirmation or not (Default value = False).
333
        dataset_files(Optional[List[DatasetFile]]): Files to remove; ignore include and exclude if passed (Default value
334
            = None).
335

336
    Returns:
337
        List[DynamicProxy]: List of files that were removed.
338
    """
339
    repository = project_context.repository
2✔
340

341
    if not include and not exclude and not dataset_files:
2✔
342
        raise errors.ParameterError("include or exclude filters not specified.")
1✔
343

344
    datasets_provenance = DatasetsProvenance()
1✔
345

346
    dataset = datasets_provenance.get_by_slug(slug=slug)
1✔
347

348
    if not dataset:
1✔
349
        raise errors.ParameterError("Dataset does not exist.")
×
350

351
    records = []
1✔
352
    if not dataset_files:
1✔
353
        records = filter_dataset_files(slugs=[slug], include=include, exclude=exclude)
1✔
354
        if not records:
1✔
355
            raise errors.ParameterError("No records found.")
1✔
356
        dataset_files = [cast(DatasetFile, r) for r in records]
1✔
357

358
    if not yes:
1✔
359
        prompt_text = (
1✔
360
            f'You are about to remove following from "{slug}" dataset.'
361
            + "\n"
362
            + "\n".join([str(record.entity.path) for record in dataset_files])
363
            + "\nDo you wish to continue?"
364
        )
365
        communication.confirm(prompt_text, abort=True, warning=True)
1✔
366

367
    for file in dataset_files:
1✔
368
        dataset.unlink_file(file.entity.path)
1✔
369
        path_file = Path(file.entity.path)
1✔
370

371
        if file.is_external or file.linked:
1✔
372
            try:
×
373
                delete_external_file(file)
×
374
            except errors.InvalidFileOperation as e:
×
375
                communication.warn(f"Cannot delete dataset file {path_file}: {e}.")
×
376
        elif dataset.is_within_datadir(path_file):  # NOTE: Remove dataset file only if it's inside dataset's datadir
1✔
377
            datadir = dataset.get_datadir()
1✔
378

379
            if not path_file.exists():
1✔
380
                communication.warn(f"Dataset file {path_file} doesn't exist, skipping the removal from {datadir}.")
1✔
381
                continue
1✔
382

383
            try:
1✔
384
                if path_file.is_dir():
1✔
385
                    shutil.rmtree(str(path_file.absolute()), ignore_errors=False, onerror=None)
×
386
                else:
387
                    path_file.unlink()
1✔
388
            except Exception as err:
×
389
                communication.warn(f"Dataset file {path_file} could not be removed from {datadir} because of {err}.")
×
390

391
        repository.add(path_file)
1✔
392

393
    datasets_provenance.add_or_update(dataset, creator=get_git_user(repository))
1✔
394

395
    return records
1✔
396

397

398
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
399
def remove_dataset(slug: str):
7✔
400
    """Delete a dataset.
401

402
    Args:
403
        slug(str): Slug of dataset to delete.
404
    """
405
    datasets_provenance = DatasetsProvenance()
1✔
406
    dataset = datasets_provenance.get_by_slug(slug=slug, strict=True)
1✔
407
    datasets_provenance.remove(dataset=dataset)
1✔
408

409

410
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
411
def export_dataset(slug: str, provider_name: str, tag: Optional[str], **kwargs):
7✔
412
    """Export data to 3rd party provider.
413

414
    Args:
415
        slug(str): Slug of dataset to export.
416
        provider_name(str): Provider to use for export.
417
        tag(str): Dataset tag from which to export.
418
    """
419
    datasets_provenance = DatasetsProvenance()
2✔
420

421
    provider_name = provider_name.lower()
2✔
422

423
    # TODO: all these callbacks are ugly, improve in #737
424
    config_key_secret = "access_token"  # nosec
2✔
425

426
    dataset: Optional[Dataset] = datasets_provenance.get_by_slug(slug, strict=True, immutable=True)
2✔
427

428
    provider = ProviderFactory.get_export_provider(provider_name=provider_name)
2✔
429

430
    selected_tag = None
2✔
431
    tags = datasets_provenance.get_all_tags(dataset)  # type: ignore
2✔
432

433
    if tag:
2✔
434
        selected_tag = next((t for t in tags if t.name == tag), None)
2✔
435

436
        if not selected_tag:
2✔
437
            raise errors.ParameterError(f"Tag '{tag}' not found for dataset '{slug}'")
×
438
    elif tags:
2✔
439
        selected_tag = prompt_tag_selection(tags)
2✔
440

441
    if selected_tag:
2✔
442
        dataset = datasets_provenance.get_by_id(selected_tag.dataset_id.value, immutable=True)
2✔
443

444
        if not dataset:
2✔
445
            raise errors.DatasetNotFound(message=f"Cannot find dataset with id: '{selected_tag.dataset_id.value}'")
×
446

447
    dataset = cast(Dataset, DynamicProxy(dataset))
2✔
448

449
    exporter = provider.get_exporter(dataset=dataset, tag=selected_tag, **kwargs)
2✔
450

451
    if exporter.requires_access_token():
2✔
452
        access_token = read_credentials(section=provider_name, key=config_key_secret)
2✔
453

454
        if access_token is None:
2✔
455
            access_token = prompt_access_token(exporter)
×
456

457
            if access_token is None or len(access_token) == 0:
×
458
                raise errors.InvalidAccessToken()
×
459

460
            store_credentials(section=provider_name, key=config_key_secret, value=access_token)
×
461

462
        exporter.set_access_token(access_token)
2✔
463

464
    try:
2✔
465
        destination = exporter.export()
2✔
466
    except errors.AuthenticationError:
2✔
467
        remove_value(provider_name, config_key_secret, global_only=True)
1✔
468
        raise
1✔
469

470
    communication.echo(f"Exported to: {destination}")
2✔
471

472

473
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
474
def import_dataset(
7✔
475
    uri: str,
476
    slug: Optional[str] = "",
477
    extract: bool = False,
478
    yes: bool = False,
479
    datadir: Optional[Path] = None,
480
    previous_dataset: Optional[Dataset] = None,
481
    delete: bool = False,
482
    gitlab_token: Optional[str] = None,
483
    **kwargs,
484
):
485
    """Import data from a 3rd party provider or another renku project.
486

487
    Args:
488
        uri(str): DOI or URL of dataset to import.
489
        slug(str): Slug to give to the imported dataset (Default value = "").
490
        extract(bool): Whether to extract compressed dataset data (Default value = False).
491
        yes(bool): Whether to skip user confirmation (Default value = False).
492
        datadir(Optional[Path]): Dataset's data directory (Default value = None).
493
        previous_dataset(Optional[Dataset]): Previously imported dataset version (Default value = None).
494
        delete(bool): Whether to delete files that don't exist anymore (Default value = False).
495
        gitlab_token(Optional[str]): Gitlab OAuth2 token (Default value = None).
496
    """
497
    from renku.core.dataset.dataset_add import add_to_dataset
2✔
498

499
    def confirm_download(files):
2✔
500
        if yes:
1✔
501
            return
1✔
502

503
        headers = {"checksum": "checksum", "filename": "name", "filesize_str": "size", "filetype": "type"}
1✔
504
        communication.echo(tabulate(files, headers=headers, floatfmt=".2f"))
1✔
505
        communication.confirm("Do you wish to download this version?", abort=True, warning=True)
1✔
506

507
    def calculate_total_size(files) -> int:
2✔
508
        total_size = 0
1✔
509
        for file in files:
1✔
510
            if file.filesize is not None:
1✔
511
                total_size += file.filesize
1✔
512

513
        return total_size
1✔
514

515
    def remove_files(dataset):
2✔
516
        """Remove files that exist in ``previous_dataset`` but not in ``dataset``.
517

518
        Args:
519
            dataset(Dataset): Dataset to update.
520
        """
521
        if not delete or not previous_dataset:
1✔
522
            return
1✔
523

524
        current_paths = {str(f.entity.path) for f in dataset.files}
×
525
        previous_paths = {str(f.entity.path) for f in previous_dataset.files}
×
526
        deleted_paths = previous_paths - current_paths
×
527

528
        for path in deleted_paths:
×
529
            delete_dataset_file(project_context.path / path, follow_symlinks=True)
×
530

531
    provider = ProviderFactory.get_import_provider(uri)
2✔
532

533
    try:
2✔
534
        importer = provider.get_importer(gitlab_token=gitlab_token, **kwargs)
2✔
535
        provider_dataset: ProviderDataset = importer.fetch_provider_dataset()
1✔
536
    except KeyError as e:
2✔
537
        raise errors.ParameterError(f"Could not process '{uri}'.\nUnable to fetch metadata: {e}")
×
538
    except LookupError as e:
2✔
539
        raise errors.ParameterError(f"Could not process '{uri}'.\nReason: {e}")
1✔
540

541
    if not importer.provider_dataset_files:
1✔
542
        raise errors.ParameterError(f"Dataset '{uri}' has no files.")
×
543

544
    confirm_download(importer.provider_dataset_files)
1✔
545

546
    try:
1✔
547
        if not importer.is_latest_version():
1✔
548
            communication.warn(f"Newer version found at {importer.latest_uri}")
×
549
    except (KeyError, LookupError):
×
550
        pass
×
551

552
    if datadir and previous_dataset:
1✔
553
        raise errors.ParameterError("Can't specify datadir when updating a previously imported dataset.")
×
554
    elif datadir:
1✔
555
        try:
1✔
556
            datadir = get_safe_relative_path(datadir, project_context.path)
1✔
557
        except ValueError as e:
×
558
            raise errors.ParameterError("Datadir must be inside repository.") from e
×
559

560
    slug = slug or provider_dataset.slug
1✔
561

562
    new_dataset = add_to_dataset(
1✔
563
        dataset_slug=slug,
564
        urls=[],
565
        importer=importer,
566
        create=not previous_dataset,
567
        force=True,  # NOTE: Force-add to include any ignored files
568
        extract=extract,
569
        overwrite=True,
570
        total_size=calculate_total_size(importer.provider_dataset_files),
571
        clear_files_before=True,
572
        datadir=datadir,
573
        storage=provider_dataset.storage,
574
    )
575

576
    new_dataset.update_metadata_from(provider_dataset)
1✔
577
    # NOTE: Remove derived_from because this is an updated and imported dataset and won't be a derivative
578
    new_dataset.derived_from = None
1✔
579

580
    remove_files(new_dataset)
1✔
581

582
    importer.tag_dataset(slug)
1✔
583
    importer.copy_extra_metadata(new_dataset)
1✔
584

585
    project_context.database.commit()
1✔
586

587

588
@inject.autoparams()
7✔
589
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
590
def update_datasets(
7✔
591
    slugs: List[str],
592
    creators: Optional[str],
593
    include: Optional[List[str]],
594
    exclude: Optional[List[str]],
595
    ref: Optional[str],
596
    delete: bool,
597
    no_local: bool,
598
    no_remote: bool,
599
    check_data_directory: bool,
600
    update_all: bool,
601
    dry_run: bool,
602
    plain: bool,
603
    dataset_gateway: IDatasetGateway,
604
) -> Tuple[List[DatasetViewModel], List[DatasetFileViewModel]]:
605
    """Update dataset files.
606

607
    Args:
608
        slugs(List[str]): Slugs of datasets to update.
609
        creators(Optional[str]): Creators to filter dataset files by.
610
        include(Optional[List[str]]): Include filter for paths to update.
611
        exclude(Optional[List[str]]): Exclude filter for paths to update.
612
        ref(Optional[str]): Git reference to use for update.
613
        delete(bool): Whether to delete files that don't exist on remote anymore.
614
        no_local(bool): Whether to exclude local files from the update.
615
        no_remote(bool): Whether to exclude remote files from the update.
616
        check_data_directory(bool): Whether to check the dataset's data directory for new files.
617
        update_all(bool): Whether to update all datasets.
618
        dry_run(bool): Whether to return a preview of what would be updated.
619
        plain(bool): Whether plain output should be produced.
620
        dataset_gateway(IDatasetGateway): Injected dataset gateway.
621
    """
622
    from renku.core.dataset.providers.renku import RenkuProvider
3✔
623

624
    if not update_all and not slugs and not include and not exclude and not dry_run:
3✔
625
        raise errors.ParameterError("No update criteria is specified")
×
626

627
    imported_dataset_updates: List[Dataset] = []
3✔
628

629
    all_datasets = dataset_gateway.get_all_active_datasets()
3✔
630
    imported_datasets = [d for d in all_datasets if d.same_as]
3✔
631

632
    if slugs and update_all:
3✔
633
        raise errors.ParameterError("Cannot pass dataset slugs when updating all datasets")
×
634
    elif (include or exclude) and update_all:
3✔
635
        raise errors.ParameterError("Cannot specify include and exclude filters when updating all datasets")
×
636
    elif (include or exclude) and slugs and any(d for d in imported_datasets if d.slug in slugs):
3✔
637
        raise errors.IncompatibleParametersError(first_param="--include/--exclude", second_param="imported datasets")
×
638

639
    slugs = slugs or [d.slug for d in all_datasets]
3✔
640

641
    # NOTE: update imported datasets
642
    if not include and not exclude and not no_remote:
3✔
643
        must_match_records = False
3✔
644

645
        for dataset in imported_datasets:
3✔
646
            if dataset.slug not in slugs:
1✔
647
                continue
×
648

649
            uri = dataset.same_as.value  # type: ignore
1✔
650
            try:
1✔
651
                provider = ProviderFactory.get_import_provider(uri)
1✔
652
            except errors.DatasetProviderNotFound:
×
653
                continue
×
654

655
            record = provider.get_importer()
1✔
656

657
            if isinstance(provider, RenkuProvider) and dataset.version is not None:
1✔
658
                tags = dataset_gateway.get_all_tags(dataset=dataset)
1✔
659
                tag = next((t for t in tags if t.name == dataset.version), None)
1✔
660
                # NOTE: Do not update Renku dataset that are imported from a specific version
661
                if tag is not None and tag.dataset_id.value == dataset.id:
1✔
662
                    communication.echo(
1✔
663
                        f"Skipped updating imported Renku dataset '{dataset.slug}' with tag '{tag.name}'"
664
                    )
665
                    slugs.remove(dataset.slug)
1✔
666
                    continue
1✔
667

668
            if record.is_latest_version() and record.is_version_equal_to(dataset):
1✔
669
                slugs.remove(dataset.slug)
×
670
                continue
×
671

672
            if not dry_run:
1✔
673
                uri = record.latest_uri
1✔
674

675
                # NOTE: set extract to false if there are any archives present in the dataset
676
                extract = True
1✔
677
                for f in dataset.files:
1✔
678
                    try:
1✔
679
                        patoolib.get_archive_format(f.entity.path)
1✔
680
                    except patoolib.util.PatoolError:
1✔
681
                        continue
1✔
682
                    else:
683
                        extract = False
×
684
                        break
×
685

686
                import_dataset(
1✔
687
                    uri=uri, slug=dataset.slug, extract=extract, yes=True, previous_dataset=dataset, delete=delete
688
                )
689

690
                communication.echo(f"Updated dataset '{dataset.slug}' from remote provider")
1✔
691

692
            slugs.remove(dataset.slug)
1✔
693
            imported_dataset_updates.append(dataset)
1✔
694
    else:
695
        must_match_records = True
2✔
696

697
    imported_dataset_updates_view_models = [DatasetViewModel.from_dataset(d) for d in imported_dataset_updates]
3✔
698

699
    if not slugs:
3✔
700
        return imported_dataset_updates_view_models, []
2✔
701

702
    # NOTE: Exclude all imported dataset from individual file filter
703
    records = filter_dataset_files(
2✔
704
        slugs=slugs,
705
        creators=creators,
706
        include=include,
707
        exclude=exclude,
708
        ignore=[d.slug for d in imported_datasets],
709
        check_data_directory=check_data_directory,
710
    )
711

712
    if not records:
2✔
713
        if must_match_records and not plain:
1✔
714
            raise errors.ParameterError("No files matched the criteria.")
1✔
715
        return imported_dataset_updates_view_models, []
×
716

717
    provider_files: Dict[AddProviderInterface, List[DynamicProxy]] = defaultdict(list)
2✔
718
    unique_remotes = set()
2✔
719
    linked_files = []
2✔
720

721
    for file in records:
2✔
722
        if file.linked:
2✔
723
            linked_files.append(file)
×
724
        else:
725
            if not getattr(file, "provider", None):
2✔
726
                if file.based_on:
2✔
727
                    uri = file.dataset.same_as.value if file.dataset.same_as else file.based_on.url
2✔
728
                else:
729
                    uri = file.source
1✔
730
                try:
2✔
731
                    file.provider = cast(
2✔
732
                        AddProviderInterface,
733
                        ProviderFactory.get_add_provider(uri),
734
                    )
735
                except errors.DatasetProviderNotFound:
×
736
                    communication.warn(f"Couldn't find provider for file {file.path} in dataset {file.dataset.slug}")
×
737
                    continue
×
738

739
            provider_files[file.provider].append(file)
2✔
740

741
            if isinstance(file.provider, GitProvider):
2✔
742
                unique_remotes.add(file.based_on.url)
1✔
743

744
    if ref and len(unique_remotes) > 1:
2✔
745
        raise errors.ParameterError(
1✔
746
            "Cannot specify a reference with more than one Git repository.\n"
747
            "Limit list of files to be updated to one repository. See 'renku dataset update -h' for more information."
748
        )
749

750
    updated_files: List[DynamicProxy] = []
2✔
751
    deleted_files: List[DynamicProxy] = []
2✔
752

753
    if linked_files:
2✔
754
        updated = update_linked_files(linked_files, dry_run=dry_run)
×
755
        updated_files.extend(updated)
×
756

757
    provider_context: Dict[str, Any] = {}
2✔
758

759
    for provider, files in provider_files.items():
2✔
760
        if (no_remote and cast(ProviderApi, provider).is_remote) or (
2✔
761
            no_local and not cast(ProviderApi, provider).is_remote
762
        ):
763
            continue
1✔
764

765
        results = provider.update_files(
2✔
766
            files=files,
767
            dry_run=dry_run,
768
            delete=delete,
769
            context=provider_context,
770
            ref=ref,
771
            check_data_directory=check_data_directory,
772
        )
773
        updated_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.UPDATE)
2✔
774
        deleted_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.DELETE)
2✔
775

776
    if not dry_run:
2✔
777
        if deleted_files and not delete:
2✔
778
            communication.echo("Some files are deleted: Pass '--delete' to remove them from datasets' metadata")
2✔
779
        if updated_files or (deleted_files and delete):
2✔
780
            file_paths = {str(project_context.path / f.entity.path) for f in updated_files}
2✔
781
            # Force-add to include possible ignored files that are in datasets
782
            repository = project_context.repository
2✔
783
            repository.add(*file_paths, force=True)
2✔
784
            repository.add(project_context.pointers_path, force=True)
2✔
785

786
            _update_datasets_files_metadata(updated_files, deleted_files, delete)
2✔
787

788
        message = f"Updated {len(updated_files)} files"
2✔
789
        if delete:
2✔
790
            message += f" and deleted {len(deleted_files)} files"
2✔
791
        communication.echo(message)
2✔
792
    else:
793
        for file in deleted_files:
2✔
794
            if not file.date_removed:
1✔
795
                file.date_removed = local_now()
1✔
796

797
    dataset_files_view_models = [
2✔
798
        DatasetFileViewModel.from_dataset_file(cast(DatasetFile, f), f.dataset) for f in updated_files + deleted_files
799
    ]
800
    return imported_dataset_updates_view_models, dataset_files_view_models
2✔
801

802

803
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
804
def show_dataset(slug: str, tag: Optional[str] = None):
7✔
805
    """Show detailed dataset information.
806

807
    Args:
808
        slug(str): Slug of dataset to show details for.
809
        tag(str, optional): Tags for which to get the metadata (Default value = None).
810

811
    Returns:
812
        dict: JSON dictionary of dataset details.
813
    """
814
    datasets_provenance = DatasetsProvenance()
1✔
815
    dataset: Optional[Dataset] = datasets_provenance.get_by_slug(slug, strict=True)
1✔
816

817
    if tag is None:
1✔
818
        return DatasetDetailsJson().dump(dataset)
1✔
819

820
    tags = datasets_provenance.get_all_tags(dataset=cast(Dataset, dataset))
1✔
821

822
    selected_tag = next((t for t in tags if t.name == tag), None)
1✔
823

824
    if selected_tag is None:
1✔
825
        raise errors.DatasetTagNotFound(tag)
×
826

827
    dataset = datasets_provenance.get_by_id(selected_tag.dataset_id.value)
1✔
828
    return DatasetDetailsJson().dump(dataset)
1✔
829

830

831
def add_datadir_files_to_dataset(dataset: Dataset) -> None:
7✔
832
    """Add all files in a datasets data directory to the dataset.
833

834
    Args:
835
        dataset(Dataset): The dataset to add data dir files to.
836
    """
837
    datadir = get_safe_relative_path(dataset.get_datadir(), project_context.path)
6✔
838

839
    if datadir.exists():
6✔
840
        # NOTE: Add existing files to dataset
841
        dataset_files: List[DatasetFile] = []
1✔
842
        files: List[Path] = []
1✔
843
        existing_files: List[Union[Path, str]] = list(get_files(datadir))
1✔
844
        checksums = project_context.repository.get_object_hashes(existing_files)
1✔
845

846
        for file in cast(List[Path], existing_files):
1✔
847
            files.append(file)
1✔
848
            dataset_files.append(DatasetFile.from_path(path=file, source=file, checksum=checksums.get(file)))
1✔
849

850
        if not dataset_files:
1✔
851
            return
1✔
852

853
        if check_external_storage():
1✔
854
            track_paths_in_storage(*files)
1✔
855
        project_context.repository.add(*files)
1✔
856

857
        dataset.add_or_update_files(dataset_files)
1✔
858

859

860
def set_dataset_images(dataset: Dataset, images: Optional[List[ImageObjectRequest]]):
7✔
861
    """Set a dataset's images.
862

863
    Args:
864
        dataset(Dataset): The dataset to set images on.
865
        images(List[ImageObjectRequest]): The images to set.
866

867
    Returns:
868
        True if images were set/modified.
869
    """
870
    if not images:
×
871
        images = []
×
872

873
    image_folder = project_context.dataset_images_path / dataset.initial_identifier
×
874
    image_folder.mkdir(exist_ok=True, parents=True)
×
875
    previous_images = dataset.images or []
×
876

877
    dataset.images = []
×
878
    images_updated = False
×
879
    for img in images:
×
880
        image_folder = project_context.dataset_images_path / dataset.initial_identifier
×
881
        try:
×
882
            img_object = img.to_image_object(owner_id=dataset.id)
×
883
        except errors.ImageError as e:
×
884
            raise errors.DatasetImageError(e) from e
×
885

886
        path = img_object.content_url
×
887

888
        if not img_object.is_remote:
×
889
            # NOTE: only copy dataset image if it's not in .renku/datasets/<id>/images/ already
890
            if not path.startswith(str(image_folder)):
×
891
                image_type = imghdr.what(path)
×
892
                if image_type:
×
893
                    ext = f".{image_type}"
×
894
                else:
895
                    _, ext = os.path.splitext(path)
×
896
                target_image_path: Union[Path, str] = image_folder / f"{img_object.position}{ext}"
×
897

898
                image_folder.parent.mkdir(parents=True, exist_ok=True)
×
899
                shutil.copy(path, target_image_path)
×
900
            else:
901
                target_image_path = path
×
902

903
            img_object.content_url = get_relative_path(target_image_path, base=project_context.path)  # type: ignore
×
904

905
        if any(i.position == img_object.position for i in dataset.images):
×
906
            raise errors.DatasetImageError(f"Duplicate dataset image specified for position {img_object.position}")
×
907

908
        dataset.images.append(img_object)
×
909
        images_updated = True
×
910

911
    new_urls = [i.content_url for i in dataset.images]
×
912

913
    for prev in previous_images:
×
914
        # NOTE: Delete images if they were removed
915
        if prev.content_url in new_urls or urllib.parse.urlparse(prev.content_url).netloc:
×
916
            continue
×
917

918
        path = prev.content_url
×
919
        if not os.path.isabs(path):
×
920
            path = os.path.normpath(os.path.join(project_context.path, path))
×
921

922
        os.remove(path)
×
923

924
    return images_updated or dataset.images != previous_images
×
925

926

927
def update_dataset_custom_metadata(
7✔
928
    dataset: Dataset,
929
    custom_metadata: Optional[Union[Dict, List[Dict]]],
930
    custom_metadata_source: Optional[str],
931
):
932
    """Update custom metadata on a dataset.
933

934
    Args:
935
        dataset(Dataset): The dataset to update.
936
        custom_metadata(Dict): Custom JSON-LD metadata to set.
937
        custom_metadata_source(str): The source field for the custom metadata.
938
    """
939

940
    existing_metadata = [a for a in dataset.annotations if a.source != custom_metadata_source]
1✔
941

942
    if custom_metadata is not None and custom_metadata_source is not None:
1✔
943
        if isinstance(custom_metadata, dict):
1✔
944
            custom_metadata = [custom_metadata]
1✔
945
        for cm in custom_metadata:
1✔
946
            existing_metadata.append(Annotation(id=Annotation.generate_id(), body=cm, source=custom_metadata_source))
1✔
947

948
    dataset.annotations = existing_metadata
1✔
949

950

951
@inject.autoparams("dataset_gateway")
7✔
952
def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dataset_slug: Optional[str] = None):
7✔
953
    """Move files and their metadata from one or more datasets to a target dataset.
954

955
    Args:
956
        dataset_gateway(IDatasetGateway):Injected dataset gateway.
957
        files(Dict[Path, Path]): Files to move
958
        to_dataset_slug(Optional[str], optional): Target dataset (Default value = None)
959
    """
960
    datasets = [d.copy() for d in dataset_gateway.get_all_active_datasets()]
1✔
961

962
    to_dataset: Optional[Dataset] = None
1✔
963
    if to_dataset_slug:
1✔
964
        # NOTE: Use the same dataset object or otherwise a race happens if dataset is in both source and destination
965
        to_dataset = next(d for d in datasets if d.slug == to_dataset_slug)
1✔
966
    modified_datasets: Dict[str, Dataset] = {}
1✔
967

968
    progress_name = "Updating dataset metadata"
1✔
969
    communication.start_progress(progress_name, total=len(files))
1✔
970
    try:
1✔
971
        checksums = project_context.repository.get_object_hashes(
1✔
972
            [file.relative_to(project_context.path) for file in files.values()]
973
        )
974
        for src, dst in files.items():
1✔
975
            src = src.relative_to(project_context.path)
1✔
976
            dst = dst.relative_to(project_context.path)
1✔
977
            # NOTE: Files are moved at this point, so, we can use dst
978
            new_dataset_file = DatasetFile.from_path(dst, checksum=checksums.get(dst))
1✔
979

980
            for dataset in datasets:
1✔
981
                removed = dataset.unlink_file(src, missing_ok=True)
1✔
982
                if removed:
1✔
983
                    modified_datasets[dataset.slug] = dataset
1✔
984
                    new_dataset_file.based_on = removed.based_on
1✔
985
                    new_dataset_file.source = removed.source
1✔
986

987
                    if not to_dataset and (
1✔
988
                        new_dataset_file.linked
989
                        or is_subpath(project_context.path / dst, project_context.path / dataset.get_datadir())
990
                    ):
991
                        dataset.add_or_update_files(new_dataset_file)
1✔
992

993
                # NOTE: Update dataset if it contains a destination that is being overwritten
994
                modified = dataset.find_file(dst)
1✔
995
                added = is_subpath(project_context.path / dst, project_context.path / dataset.get_datadir())
1✔
996
                if modified or added:
1✔
997
                    modified_datasets[dataset.slug] = dataset
1✔
998
                    dataset.add_or_update_files(new_dataset_file)
1✔
999

1000
            if to_dataset:
1✔
1001
                to_dataset.add_or_update_files(new_dataset_file)
1✔
1002

1003
            communication.update_progress(progress_name, amount=1)
1✔
1004
    finally:
1005
        communication.finalize_progress(progress_name)
1✔
1006

1007
    datasets_provenance = DatasetsProvenance()
1✔
1008
    modified_dataset_values = list(modified_datasets.values())
1✔
1009
    creator = get_git_user(repository=project_context.repository)
1✔
1010
    for modified_dataset in modified_dataset_values:
1✔
1011
        datasets_provenance.add_or_update(modified_dataset, creator=creator)
1✔
1012
    if to_dataset and to_dataset not in modified_dataset_values:
1✔
1013
        datasets_provenance.add_or_update(to_dataset, creator=creator)
×
1014

1015

1016
def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool):
7✔
1017
    modified_datasets = {}
2✔
1018
    checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files])
2✔
1019
    for file in updated_files:
2✔
1020
        new_file = DatasetFile.from_path(
2✔
1021
            path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path)
1022
        )
1023
        modified_datasets[file.dataset.slug] = (
2✔
1024
            file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
1025
        )
1026
        file.dataset.add_or_update_files(new_file)
2✔
1027

1028
    if delete:
2✔
1029
        for file in deleted_files:
2✔
1030
            modified_datasets[file.dataset.slug] = (
2✔
1031
                file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
1032
            )
1033
            file.dataset.unlink_file(file.entity.path)
2✔
1034

1035
    datasets_provenance = DatasetsProvenance()
2✔
1036
    for dataset in modified_datasets.values():
2✔
1037
        datasets_provenance.add_or_update(dataset, creator=get_git_user(repository=project_context.repository))
2✔
1038

1039

1040
def update_linked_files(records: List[DynamicProxy], dry_run: bool) -> List[DynamicProxy]:
7✔
1041
    """Update files linked to other files in the project.
1042

1043
    Args:
1044
        records(List[DynamicProxy]): File records to update.
1045
        dry_run(bool): Whether to return a preview of what would be updated.
1046
    """
1047
    updated_files = []
×
1048

1049
    for file in records:
×
1050
        if file.linked:
×
1051
            try:
×
1052
                updated, checksum = is_linked_file_updated(path=file.entity.path)
×
1053
            except errors.ExternalFileNotFound as e:
×
1054
                if not dry_run:
×
1055
                    raise
×
1056
                communication.warn(str(e))
×
1057
                continue
×
1058

1059
            if updated:
×
1060
                if not dry_run:
×
1061
                    update_linked_file(path=file.entity.path, checksum=checksum)
×
1062
                updated_files.append(file)
×
1063

1064
    return updated_files
×
1065

1066

1067
@inject.autoparams("dataset_gateway")
7✔
1068
def filter_dataset_files(
7✔
1069
    dataset_gateway: IDatasetGateway,
1070
    slugs: Optional[List[str]] = None,
1071
    tag: Optional[str] = None,
1072
    creators: Optional[Union[str, List[str], Tuple[str]]] = None,
1073
    include: Optional[List[str]] = None,
1074
    exclude: Optional[List[str]] = None,
1075
    ignore: Optional[List[str]] = None,
1076
    immutable: bool = False,
1077
    check_data_directory: bool = False,
1078
) -> List[DynamicProxy]:
1079
    """Filter dataset files by specified filters.
1080

1081
    Args:
1082
        dataset_gateway(IDatasetGateway):Injected dataset gateway.
1083
        slugs(Optional[List[str]]): Filter by specified dataset slugs (Default value = None).
1084
        tag(Optional[str]): Filter by specified tag (Default value = None).
1085
        creators(Optional[Union[str, List[str], Tuple[str]]]): Filter by creators (Default value = None).
1086
        include(Optional[List[str]]): Tuple containing patterns to which include from result (Default value = None).
1087
        exclude(Optional[List[str]]): Tuple containing patterns to which exclude from result (Default value = None).
1088
        ignore(Optional[List[str]]): Ignored datasets (Default value = None).
1089
        immutable(bool): Return immutable copies of dataset objects (Default value = False).
1090
        check_data_directory(bool): Whether to check for new files in dataset's data directory that aren't in the
1091
            dataset yet (Default value = False).
1092
    Returns:
1093
        List[DynamicProxy]: List of filtered files sorted by date added.
1094
    """
1095

1096
    def should_include(filepath: Path) -> bool:
4✔
1097
        """Check if file matches one of include filters and not in exclude filter."""
1098
        if exclude:
4✔
1099
            for pattern in exclude:
1✔
1100
                if filepath.match(pattern):
1✔
1101
                    return False
1✔
1102

1103
        if include:
4✔
1104
            for pattern in include:
2✔
1105
                if filepath.match(pattern):
2✔
1106
                    return True
2✔
1107
            return False
2✔
1108

1109
        return True
4✔
1110

1111
    creators_set = set()
4✔
1112
    if isinstance(creators, str):
4✔
1113
        creators_set = set(creators.split(","))
1✔
1114
    elif isinstance(creators, list) or isinstance(creators, tuple):
4✔
1115
        creators_set = set(creators)
×
1116

1117
    records = []
4✔
1118
    unused_slugs = set(slugs) if slugs is not None else set()
4✔
1119

1120
    if ignore:
4✔
1121
        unused_slugs = unused_slugs - set(ignore)
×
1122

1123
    for dataset in dataset_gateway.get_all_active_datasets():
4✔
1124
        if (slugs and dataset.slug not in slugs) or (ignore and dataset.slug in ignore):
4✔
1125
            continue
1✔
1126

1127
        if tag:
4✔
1128
            dataset = get_dataset_by_tag(dataset=dataset, tag=tag)  # type: ignore
1✔
1129
            if not dataset:
1✔
1130
                continue
×
1131

1132
        if not immutable:
4✔
1133
            dataset = dataset.copy()
2✔
1134

1135
        if unused_slugs:
4✔
1136
            unused_slugs.remove(dataset.slug)
3✔
1137

1138
        if creators_set:
4✔
1139
            dataset_creators = {creator.name for creator in dataset.creators}
1✔
1140
            if not creators_set.issubset(dataset_creators):
1✔
1141
                continue
×
1142

1143
        for file in dataset.files:
4✔
1144
            if not should_include(Path(file.entity.path)):
4✔
1145
                continue
2✔
1146

1147
            record = DynamicProxy(file)
4✔
1148
            record.dataset = DynamicProxy(dataset)
4✔
1149
            records.append(record)
4✔
1150

1151
        if not check_data_directory:
4✔
1152
            continue
4✔
1153

1154
        for root, _, files in os.walk(project_context.path / dataset.get_datadir()):
1✔
1155
            current_folder = Path(root)
1✔
1156
            for current_file in files:
1✔
1157
                file_path = get_safe_relative_path(current_folder / current_file, project_context.path)
1✔
1158
                if should_include(file_path) and not dataset.find_file(file_path):
1✔
1159
                    # New file in dataset folder
1160
                    record = DynamicProxy(DatasetFile.from_path(file_path))
1✔
1161
                    record.dataset = dataset
1✔
1162
                    records.append(record)
1✔
1163

1164
    if unused_slugs:
4✔
1165
        unused_slugs_str = ", ".join(unused_slugs)
1✔
1166
        raise errors.ParameterError(f"These datasets don't exist: {unused_slugs_str}")
1✔
1167

1168
    return sorted(records, key=lambda r: r.date_added)
4✔
1169

1170

1171
def download_file(file: DatasetFile, storage: "IStorage") -> List[DatasetFile]:
7✔
1172
    """Download a dataset file and retrieve its missing metadata (if any).
1173

1174
    Args:
1175
        file(DatasetFile): Dataset file to download.
1176
        storage: Dataset's cloud storage (an instance of ``IStorage``).
1177

1178
    Returns:
1179
         List[DatasetFile]: A list with the updated file if its metadata was missing; an empty list otherwise.
1180

1181
    """
1182
    if not file.based_on:
1✔
1183
        raise errors.DatasetImportError(f"Dataset file doesn't have a URI: {file.entity.path}")
×
1184

1185
    path = project_context.path / file.entity.path
1✔
1186
    path.resolve().parent.mkdir(parents=True, exist_ok=True)
1✔
1187

1188
    # NOTE: Don't check if destination file exists. ``IStorage.copy`` won't copy a file if it exists and is not
1189
    # modified.
1190

1191
    communication.start_progress(name=file.entity.path, total=1)
1✔
1192
    try:
1✔
1193
        storage.download(file.based_on.url, path)
1✔
1194
        communication.update_progress(name=file.entity.path, amount=1)
1✔
1195
    finally:
1196
        communication.finalize_progress(name=file.entity.path)
1✔
1197

1198
    # NOTE: File has no missing information
1199
    if file.has_valid_checksum() and file.has_valid_size():
1✔
1200
        return []
1✔
1201

1202
    if not file.has_valid_checksum():
×
1203
        md5_hash = hash_file(path, hash_type="md5") or NON_EXISTING_ENTITY_CHECKSUM
×
1204
        entity = Entity(path=file.entity.path, checksum=md5_hash)
×
1205
        remote_entity = RemoteEntity(checksum=md5_hash, url=file.based_on.url, path=file.based_on.path)
×
1206
    else:
1207
        entity = file.entity
×
1208
        remote_entity = file.based_on
×
1209

1210
    size = file.size if file.has_valid_size() else get_file_size(path)
×
1211

1212
    return [
×
1213
        DatasetFile(
1214
            entity=entity,
1215
            based_on=remote_entity,
1216
            size=size,
1217
            date_added=file.date_added,
1218
            date_removed=file.date_removed,
1219
            source=file.source,
1220
        )
1221
    ]
1222

1223

1224
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
1225
def pull_cloud_storage(slug: str, location: Optional[Path] = None) -> None:
7✔
1226
    """Pull/copy data for a cloud storage to a dataset's data directory or a specified location.
1227

1228
    Args:
1229
        slug(str): Slug of the dataset
1230
        location(Optional[Path]): A directory to copy data to (Default value = None).
1231
    """
1232
    dataset, datadir = _get_dataset_with_cloud_storage(slug=slug)
1✔
1233

1234
    # NOTE: Try to unmount the path in case it was mounted before
1235
    unmount_path(datadir)
1✔
1236

1237
    if location:
1✔
1238
        if not is_path_empty(datadir):
1✔
1239
            communication.confirm(
1✔
1240
                f"Dataset's data directory will be removed: {dataset.get_datadir()}. Do you want to continue?",
1241
                abort=True,
1242
                warning=True,
1243
            )
1244
        create_symlink(target=location, symlink_path=datadir, overwrite=True)
1✔
1245

1246
    provider = ProviderFactory.get_pull_provider(uri=dataset.storage)
1✔
1247
    storage = provider.get_storage()
1✔
1248

1249
    updated_files = parallel_execute(download_file, dataset.files, rate=5, storage=storage)
1✔
1250

1251
    if updated_files:
1✔
1252
        dataset.add_or_update_files(updated_files)
×
1253
        DatasetsProvenance().add_or_update(dataset, creator=get_git_user(repository=project_context.repository))
×
1254
        project_context.database.commit()
×
1255

1256

1257
def store_dataset_data_location(dataset: Dataset, location: Optional[Path]) -> None:
7✔
1258
    """Store data location for a dataset in the config file."""
1259
    section = "dataset-locations"
×
1260
    key = dataset.slug
×
1261

1262
    if not location:
×
1263
        remove_value(section=section, key=key)
×
1264
    else:
1265
        set_value(section=section, key=key, value=get_absolute_path(location))
×
1266

1267

1268
def read_dataset_data_location(dataset: Dataset) -> Optional[str]:
7✔
1269
    """Read data location for a dataset in the config file."""
1270
    return get_value(section="dataset-locations", key=dataset.slug, config_filter=ConfigFilter.LOCAL_ONLY)
×
1271

1272

1273
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
1274
def mount_cloud_storage(slug: str, existing: Optional[Path], yes: bool) -> None:
7✔
1275
    """Mount a cloud storage to a dataset's data directory.
1276

1277
    Args:
1278
        slug(str): Slug of the dataset
1279
        existing(Optional[Path]): An existing mount point to use instead of actually mounting the backend storage.
1280
        yes(bool): Don't prompt when removing non-empty dataset's data directory.
1281
    """
1282
    dataset, datadir = _get_dataset_with_cloud_storage(slug=slug)
1✔
1283

1284
    # NOTE: Try to unmount the path in case it was mounted before
1285
    unmount_path(datadir)
1✔
1286

1287
    if not is_path_empty(datadir) and not yes:
1✔
1288
        communication.confirm(
×
1289
            f"Dataset's data directory will be removed: {dataset.get_datadir()}. Do you want to continue?",
1290
            abort=True,
1291
            warning=True,
1292
        )
1293

1294
    if existing:
1✔
1295
        create_symlink(target=existing, symlink_path=datadir, overwrite=True)
×
1296
        return
×
1297

1298
    delete_path(datadir)
1✔
1299
    datadir.mkdir(parents=True, exist_ok=True)
1✔
1300

1301
    provider = ProviderFactory.get_mount_provider(uri=dataset.storage)
1✔
1302
    credentials = provider.get_credentials()
1✔
1303
    prompt_for_credentials(credentials)
1✔
1304
    storage = provider.get_storage(credentials=credentials)
1✔
1305

1306
    with communication.busy(f"Mounting {provider.uri}"):
1✔
1307
        storage.mount(datadir)
1✔
1308

1309

1310
@validate_arguments(config=dict(arbitrary_types_allowed=True))
7✔
1311
def unmount_cloud_storage(slug: str) -> None:
7✔
1312
    """Mount a cloud storage to a dataset's data directory.
1313

1314
    Args:
1315
        slug(str): Slug of the dataset
1316
    """
1317
    _, datadir = _get_dataset_with_cloud_storage(slug=slug)
2✔
1318
    unmount_path(datadir)
2✔
1319

1320

1321
def _get_dataset_with_cloud_storage(slug: str) -> Tuple[Dataset, Path]:
7✔
1322
    datasets_provenance = DatasetsProvenance()
2✔
1323

1324
    dataset = datasets_provenance.get_by_slug(slug=slug, strict=True)
2✔
1325

1326
    if not dataset.storage:
2✔
1327
        raise errors.ParameterError(f"Dataset '{slug}' doesn't have a storage backend")
×
1328

1329
    datadir = project_context.path / dataset.get_datadir()
2✔
1330

1331
    return dataset, datadir
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc