• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 18715077147

22 Oct 2025 11:45AM UTC coverage: 86.653% (-0.1%) from 86.802%
18715077147

Pull #945

github

web-flow
Merge 93c973951 into 5cc2b39de
Pull Request #945: feat: add support for OpenBIS datasets

70 of 139 new or added lines in 12 files covered. (50.36%)

4 existing lines in 3 files now uncovered.

22781 of 26290 relevant lines covered (86.65%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.11
/components/renku_data_services/notebooks/api/schemas/cloud_storage.py
1
"""Schema for cloudstorage config."""
2

3
import json
2✔
4
from configparser import ConfigParser
2✔
5
from io import StringIO
2✔
6
from pathlib import PurePosixPath
2✔
7
from typing import Any, Final, Optional, Protocol, Self
2✔
8

9
from kubernetes import client
2✔
10
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema
2✔
11

12
from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest
2✔
13
from renku_data_services.storage.models import CloudStorage
2✔
14

15
_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization
2✔
16

17

18
class RCloneStorageRequest(Schema):
2✔
19
    """Request for RClone based storage."""
20

21
    class Meta:
2✔
22
        """Configuration."""
23

24
        unknown = EXCLUDE
2✔
25

26
    source_path = fields.Str()
2✔
27
    target_path = fields.Str()
2✔
28
    configuration = fields.Dict(keys=fields.Str(), values=fields.Raw(), load_default=None, allow_none=True)
2✔
29
    storage_id = fields.Str(load_default=None, allow_none=True)
2✔
30
    readonly = fields.Bool(load_default=True, allow_none=False)
2✔
31

32
    @validates_schema
2✔
33
    def validate_storage(self, data: dict, **kwargs: dict) -> None:
2✔
34
        """Validate a storage request."""
35
        if data.get("storage_id") and (data.get("source_path") or data.get("target_path")):
×
36
            raise ValidationError("'storage_id' cannot be used together with 'source_path' or 'target_path'")
×
37

38

39
class RCloneStorageRequestOverride(Protocol):
2✔
40
    """A small dataclass for handling overrides to the data connector requests."""
41

42
    source_path: str | None = None
2✔
43
    target_path: str | None = None
2✔
44
    configuration: dict[str, Any] | None = None
2✔
45
    readonly: bool | None = None
2✔
46

47

48
class RCloneStorage(ICloudStorageRequest):
2✔
49
    """RClone based storage."""
50

51
    pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName"
2✔
52

53
    def __init__(
2✔
54
        self,
55
        source_path: str,
56
        configuration: dict[str, Any],
57
        readonly: bool,
58
        mount_folder: str,
59
        name: Optional[str],
60
        secrets: dict[str, str],  # "Mapping between secret ID (key) and secret name (value)
61
        storage_class: str,
62
        user_secret_key: str | None = None,
63
    ) -> None:
64
        """Creates a cloud storage instance without validating the configuration."""
65
        self.configuration = configuration
×
66
        self.source_path = source_path
×
67
        self.mount_folder = mount_folder
×
68
        self.readonly = readonly
×
69
        self.name = name
×
70
        self.secrets = secrets
×
71
        self.base_name: str | None = None
×
72
        self.user_secret_key = user_secret_key
×
73
        self.storage_class = storage_class
×
74

75
    @classmethod
2✔
76
    async def storage_from_schema(
2✔
77
        cls,
78
        data: dict[str, Any],
79
        work_dir: PurePosixPath,
80
        saved_storage: CloudStorage | None,
81
        storage_class: str,
82
        user_secret_key: str | None = None,
83
    ) -> Self:
84
        """Create storage object from request."""
85
        name = None
×
86
        if saved_storage:
×
87
            configuration = {**saved_storage.configuration.model_dump(), **(data.get("configuration", {}))}
×
88
            readonly = saved_storage.readonly
×
89
            name = saved_storage.name
×
90
        else:
91
            source_path = data["source_path"]
×
92
            target_path = data["target_path"]
×
93
            configuration = data["configuration"]
×
94
            readonly = data.get("readonly", True)
×
95

96
        # NOTE: This is used only in Renku v1, there we do not save secrets for storage
97
        secrets: dict[str, str] = {}
×
98
        mount_folder = str(work_dir / target_path)
×
99
        return cls(
×
100
            source_path=source_path,
101
            configuration=configuration,
102
            readonly=readonly,
103
            mount_folder=mount_folder,
104
            name=name,
105
            storage_class=storage_class,
106
            secrets=secrets,
107
            user_secret_key=user_secret_key,
108
        )
109

110
    def pvc(
2✔
111
        self,
112
        base_name: str,
113
        namespace: str,
114
        labels: dict[str, str] | None = None,
115
        annotations: dict[str, str] | None = None,
116
    ) -> client.V1PersistentVolumeClaim:
117
        """The PVC for mounting cloud storage."""
118
        return client.V1PersistentVolumeClaim(
×
119
            api_version="v1",
120
            kind="PersistentVolumeClaim",
121
            metadata=client.V1ObjectMeta(
122
                name=base_name,
123
                namespace=namespace,
124
                annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}),
125
                labels={"name": base_name} | (labels or {}),
126
            ),
127
            spec=client.V1PersistentVolumeClaimSpec(
128
                access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
129
                resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}),
130
                storage_class_name=self.storage_class,
131
            ),
132
        )
133

134
    def volume_mount(self, base_name: str) -> client.V1VolumeMount:
2✔
135
        """The volume mount for cloud storage."""
136
        return client.V1VolumeMount(
×
137
            mount_path=self.mount_folder,
138
            name=base_name,
139
            read_only=self.readonly,
140
        )
141

142
    def volume(self, base_name: str) -> client.V1Volume:
2✔
143
        """The volume entry for the statefulset specification."""
144
        return client.V1Volume(
×
145
            name=base_name,
146
            persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
147
                claim_name=base_name, read_only=self.readonly
148
            ),
149
        )
150

151
    def secret(
2✔
152
        self,
153
        base_name: str,
154
        namespace: str,
155
        labels: dict[str, str] | None = None,
156
        annotations: dict[str, str] | None = None,
157
        user_secret_key: str | None = None,
158
    ) -> client.V1Secret:
159
        """The secret containing the configuration for the rclone csi driver."""
160
        string_data = {
×
161
            "remote": self.name or base_name,
162
            "remotePath": self.source_path,
163
            "configData": self.config_string(self.name or base_name),
164
        }
165
        string_data.update(self.mount_options())
×
166
        # NOTE: in Renku v1 this function is not directly called so the base name
167
        # comes from the user_secret_key property on the class instance
168
        if self.user_secret_key:
×
169
            string_data["secretKey"] = self.user_secret_key
×
170
        if user_secret_key:
×
171
            string_data["secretKey"] = user_secret_key
×
172
        return client.V1Secret(
×
173
            api_version="v1",
174
            kind="Secret",
175
            metadata=client.V1ObjectMeta(
176
                name=base_name,
177
                namespace=namespace,
178
                annotations=annotations,
179
                labels={"name": base_name} | (labels or {}),
180
            ),
181
            string_data=string_data,
182
        )
183

184
    def get_manifest_patch(
2✔
185
        self,
186
        base_name: str,
187
        namespace: str,
188
        labels: dict[str, str] | None = None,
189
        annotations: dict[str, str] | None = None,
190
    ) -> list[dict[str, Any]]:
191
        """Get server manifest patch."""
192
        self.base_name = base_name
×
193
        patches = []
×
194
        patches.append(
×
195
            {
196
                "type": "application/json-patch+json",
197
                "patch": [
198
                    {
199
                        "op": "add",
200
                        "path": f"/{base_name}-pv",
201
                        "value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)),
202
                    },
203
                    {
204
                        "op": "add",
205
                        "path": f"/{base_name}-secret",
206
                        "value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)),
207
                    },
208
                    {
209
                        "op": "add",
210
                        "path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-",
211
                        "value": _sanitize_for_serialization(self.volume_mount(base_name)),
212
                    },
213
                    {
214
                        "op": "add",
215
                        "path": "/statefulset/spec/template/spec/volumes/-",
216
                        "value": _sanitize_for_serialization(self.volume(base_name)),
217
                    },
218
                ],
219
            }
220
        )
221
        return patches
×
222

223
    def config_string(self, name: str) -> str:
2✔
224
        """Convert configuration object to string representation.
225

226
        Needed to create RClone compatible INI files.
227
        """
228
        if not self.configuration:
×
229
            raise ValidationError("Missing configuration for cloud storage")
×
230

231
        # TODO Use RCloneValidator.get_real_configuration(...) instead.
232
        # Transform configuration for polybox, switchDrive, openbis or sftp
233
        storage_type = self.configuration.get("type", "")
×
234
        access = self.configuration.get("provider", "")
×
235

236
        if storage_type == "polybox" or storage_type == "switchDrive":
×
UNCOV
237
            self.configuration["type"] = "webdav"
×
238
            self.configuration["provider"] = ""
×
239
            # NOTE: Without the vendor field mounting storage and editing files results in the modification
240
            # time for touched files to be temporarily set to `1999-09-04` which causes the text
241
            # editor to complain that the file has changed and whether it should overwrite new changes.
242
            self.configuration["vendor"] = "owncloud"
×
NEW
243
        elif storage_type == "s3" and access == "Switch":
×
244
            # Switch is a fake provider we add for users, we need to replace it since rclone itself
245
            # doesn't know it
NEW
246
            self.configuration["provider"] = "Other"
×
NEW
247
        elif storage_type == "openbis":
×
NEW
248
            self.configuration["type"] = "sftp"
×
NEW
249
            self.configuration["port"] = "2222"
×
NEW
250
            self.configuration["user"] = "?"
×
NEW
251
            self.configuration["pass"] = self.configuration.pop("session_token", self.configuration["pass"])
×
252

NEW
253
        if storage_type == "sftp" or storage_type == "openbis":
×
254
            # Do not allow retries for sftp
255
            # Reference: https://rclone.org/docs/#globalconfig
NEW
256
            self.configuration["override.low_level_retries"] = 1
×
257

258
        if access == "shared" and storage_type == "polybox":
×
259
            self.configuration["url"] = "https://polybox.ethz.ch/public.php/webdav/"
×
260
        elif access == "shared" and storage_type == "switchDrive":
×
261
            self.configuration["url"] = "https://drive.switch.ch/public.php/webdav/"
×
262
        elif access == "personal" and storage_type == "polybox":
×
263
            self.configuration["url"] = "https://polybox.ethz.ch/remote.php/webdav/"
×
264
        elif access == "personal" and storage_type == "switchDrive":
×
265
            self.configuration["url"] = "https://drive.switch.ch/remote.php/webdav/"
×
266

267
        # Extract the user from the public link
268
        if access == "shared" and storage_type in {"polybox", "switchDrive"}:
×
269
            public_link = self.configuration.get("public_link", "")
×
270
            user_identifier = public_link.split("/")[-1]
×
271
            self.configuration["user"] = user_identifier
×
272

273
        parser = ConfigParser()
×
UNCOV
274
        parser.add_section(name)
×
275

276
        def _stringify(value: Any) -> str:
×
277
            if isinstance(value, bool):
×
278
                return "true" if value else "false"
×
279
            return str(value)
×
280

281
        for k, v in self.configuration.items():
×
282
            parser.set(name, k, _stringify(v))
×
283
        stringio = StringIO()
×
284
        parser.write(stringio)
×
285
        return stringio.getvalue()
×
286

287
    def with_override(self, override: RCloneStorageRequestOverride) -> "RCloneStorage":
2✔
288
        """Override certain fields on the storage."""
289
        return RCloneStorage(
×
290
            source_path=override.source_path if override.source_path else self.source_path,
291
            mount_folder=override.target_path if override.target_path else self.mount_folder,
292
            readonly=override.readonly if override.readonly is not None else self.readonly,
293
            configuration=override.configuration if override.configuration else self.configuration,
294
            name=self.name,
295
            secrets=self.secrets,
296
            storage_class=self.storage_class,
297
            user_secret_key=self.user_secret_key,
298
        )
299

300
    def mount_options(self) -> dict[str, str]:
2✔
301
        """Returns extra mount options for this storage."""
302
        if not self.configuration:
×
303
            raise ValidationError("Missing configuration for cloud storage")
×
304

305
        vfs_options: dict[str, Any] = dict()
×
306
        mount_options: dict[str, Any] = dict()
×
307
        storage_type = self.configuration.get("type", "")
×
308
        if storage_type == "doi":
×
309
            vfs_options["CacheMode"] = "full"
×
310
            mount_options["AttrTimeout"] = "41s"
×
311

312
        options: dict[str, str] = dict()
×
313
        if vfs_options:
×
314
            options["vfsOpt"] = json.dumps(vfs_options)
×
315
        if mount_options:
×
316
            options["mountOpt"] = json.dumps(mount_options)
×
317
        return options
×
318

319
    def __repr__(self) -> str:
2✔
320
        """Override to make sure no secrets or sensitive configuration gets printed in logs."""
321
        return (
×
322
            f"{RCloneStorageRequest.__name__}(name={self.name}, source_path={self.source_path}, "
323
            f"mount_folder={self.mount_folder}, readonly={self.readonly})"
324
        )
325

326

327
class LaunchNotebookResponseCloudStorage(RCloneStorageRequest):
2✔
328
    """Notebook launch response with cloud storage attached."""
329

330
    class Meta:
2✔
331
        """Specify fields."""
332

333
        fields = ("remote", "mount_folder", "type")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc