• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 18715077147

22 Oct 2025 11:45AM UTC coverage: 86.653% (-0.1%) from 86.802%
18715077147

Pull #945

github

web-flow
Merge 93c973951 into 5cc2b39de
Pull Request #945: feat: add support for OpenBIS datasets

70 of 139 new or added lines in 12 files covered. (50.36%)

4 existing lines in 3 files now uncovered.

22781 of 26290 relevant lines covered (86.65%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/components/renku_data_services/storage/rclone.py
1
"""Apispec schemas for storage service."""
2

3
import asyncio
2✔
4
import json
2✔
5
import tempfile
2✔
6
from collections.abc import Generator
2✔
7
from pathlib import Path
2✔
8
from typing import TYPE_CHECKING, Any, NamedTuple, Union, cast
2✔
9

10
from pydantic import BaseModel, Field, ValidationError
2✔
11

12
from renku_data_services import errors
2✔
13
from renku_data_services.app_config import logging
2✔
14
from renku_data_services.storage.rclone_patches import BANNED_SFTP_OPTIONS, BANNED_STORAGE, apply_patches
2✔
15

16
logger = logging.getLogger(__name__)
2✔
17

18
if TYPE_CHECKING:
2✔
19
    from renku_data_services.storage.models import RCloneConfig
×
20

21

22
class ConnectionResult(NamedTuple):
2✔
23
    """Result of testing a connection to cloud storage through RClone."""
24

25
    success: bool
2✔
26
    error: str
2✔
27

28

29
class RCloneValidator:
2✔
30
    """Class for validating RClone configs."""
31

32
    def __init__(self) -> None:
2✔
33
        """Initialize with contained schema file."""
34
        with open(Path(__file__).parent / "rclone_schema.autogenerated.json") as f:
2✔
35
            spec = json.load(f)
2✔
36

37
        apply_patches(spec)
2✔
38

39
        self.providers: dict[str, RCloneProviderSchema] = {}
2✔
40

41
        for provider_config in spec:
2✔
42
            try:
2✔
43
                provider_schema = RCloneProviderSchema.model_validate(provider_config)
2✔
44
                self.providers[provider_schema.prefix] = provider_schema
2✔
45
            except ValidationError:
×
46
                logger.error("Couldn't load RClone config: %s", provider_config)
×
47
                raise
×
48

49
    def validate(self, configuration: Union["RCloneConfig", dict[str, Any]], keep_sensitive: bool = False) -> None:
2✔
50
        """Validates an RClone config."""
51
        provider = self.get_provider(configuration)
2✔
52

53
        provider.validate_config(configuration, keep_sensitive=keep_sensitive)
1✔
54

55
    def validate_sensitive_data(
2✔
56
        self, configuration: Union["RCloneConfig", dict[str, Any]], sensitive_data: dict[str, str]
57
    ) -> None:
58
        """Validates whether the provided sensitive data is marked as sensitive in the rclone schema."""
NEW
59
        sensitive_options = self.get_provider(configuration).sensitive_options
×
NEW
60
        sensitive_options_name_lookup = [o.name for o in sensitive_options]
×
NEW
61
        sensitive_data_counter = 0
×
NEW
62
        for key, value in sensitive_data.items():
×
NEW
63
            if len(value) > 0 and key in sensitive_options_name_lookup:
×
NEW
64
                sensitive_data_counter += 1
×
NEW
65
                continue
×
NEW
66
            raise errors.ValidationError(message=f"The '{key}' property is not marked as sensitive.")
×
67

68
    def get_real_configuration(self, configuration: Union["RCloneConfig", dict[str, Any]]) -> dict[str, Any]:
2✔
69
        """Converts a Renku rclone configuration to a real rclone config."""
70
        real_config = dict(configuration)
1✔
71

72
        if real_config["type"] == "s3" and real_config.get("provider") == "Switch":
1✔
73
            # Switch is a fake provider we add for users, we need to replace it since rclone itself
74
            # doesn't know it
NEW
75
            real_config["provider"] = "Other"
×
76
        elif configuration["type"] == "openbis":
1✔
NEW
77
            real_config["type"] = "sftp"
×
NEW
78
            real_config["port"] = "2222"
×
NEW
79
            real_config["user"] = "?"
×
NEW
80
            real_config["pass"] = real_config.pop("session_token")
×
81
        return real_config
1✔
82

83
    async def test_connection(
2✔
84
        self, configuration: Union["RCloneConfig", dict[str, Any]], source_path: str
85
    ) -> ConnectionResult:
86
        """Tests connecting with an RClone config."""
87
        try:
1✔
88
            self.get_provider(configuration)
1✔
89
        except errors.ValidationError as e:
×
90
            return ConnectionResult(False, str(e))
×
91

92
        # Obscure configuration and transform if needed
93
        obscured_config = await self.obscure_config(self.get_real_configuration(configuration))
1✔
94
        transformed_config = self.transform_polybox_switchdriver_config(obscured_config)
1✔
95

96
        with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") as f:
1✔
97
            config = "\n".join(f"{k}={v}" for k, v in transformed_config.items())
1✔
98
            f.write(f"[temp]\n{config}")
1✔
99
            f.close()
1✔
100
            args = [
1✔
101
                "lsf",
102
                "--low-level-retries=1",  # Connection tests should fail fast.
103
                "--retries=1",  # Connection tests should fail fast.
104
                "--config",
105
                f.name,
106
                f"temp:{source_path}",
107
            ]
108
            # Handle SFTP retries, see https://github.com/SwissDataScienceCenter/renku-data-services/issues/893
109
            storage_type = cast(str, configuration.get("type"))
1✔
110
            if storage_type == "sftp":
1✔
111
                args.extend(["--low-level-retries", "1"])
×
112
            proc = await asyncio.create_subprocess_exec(
1✔
113
                "rclone",
114
                *args,
115
                stdout=asyncio.subprocess.PIPE,
116
                stderr=asyncio.subprocess.PIPE,
117
            )
118
            _, error = await proc.communicate()
1✔
119
            success = proc.returncode == 0
1✔
120
        return ConnectionResult(success=success, error=error.decode())
1✔
121

122
    async def obscure_config(
2✔
123
        self, configuration: Union["RCloneConfig", dict[str, Any]]
124
    ) -> Union["RCloneConfig", dict[str, Any]]:
125
        """Obscure secrets in rclone config."""
126
        provider = self.get_provider(configuration)
2✔
127
        result = await provider.obscure_password_options(configuration)
1✔
128
        return result
1✔
129

130
    def remove_sensitive_options_from_config(self, configuration: Union["RCloneConfig", dict[str, Any]]) -> None:
2✔
131
        """Remove sensitive fields from a config, e.g. when turning a private storage public."""
132

133
        provider = self.get_provider(configuration)
×
134

135
        provider.remove_sensitive_options_from_config(configuration)
×
136

137
    def get_provider(self, configuration: Union["RCloneConfig", dict[str, Any]]) -> "RCloneProviderSchema":
2✔
138
        """Get a provider for configuration."""
139

140
        storage_type = cast(str, configuration.get("type"))
2✔
141

142
        if storage_type is None:
2✔
143
            raise errors.ValidationError(
1✔
144
                message="Expected a `type` field in the RClone configuration, but didn't find it."
145
            )
146
        if storage_type in BANNED_STORAGE:
1✔
147
            raise errors.ValidationError(message=f"Storage '{storage_type}' is not supported.")
1✔
148

149
        provider = self.providers.get(storage_type)
1✔
150

151
        if provider is None:
1✔
152
            raise errors.ValidationError(message=f"RClone provider '{storage_type}' does not exist.")
1✔
153
        return provider
1✔
154

155
    def asdict(self) -> list[dict[str, Any]]:
2✔
156
        """Return Schema as dict."""
157
        return [provider.model_dump(exclude_none=True, by_alias=True) for provider in self.providers.values()]
2✔
158

159
    def get_private_fields(
2✔
160
        self, configuration: Union["RCloneConfig", dict[str, Any]]
161
    ) -> Generator["RCloneOption", None, None]:
162
        """Get private field descriptions for storage."""
163
        provider = self.get_provider(configuration)
1✔
164
        return provider.get_private_fields(configuration)
1✔
165

166
    async def get_doi_metadata(self, configuration: Union["RCloneConfig", dict[str, Any]]) -> "RCloneDOIMetadata":
2✔
167
        """Returns the metadata of a DOI remote."""
168
        provider = self.get_provider(configuration)
1✔
169
        if provider.name != "doi":
1✔
170
            raise errors.ValidationError(message="Configuration is not of type DOI")
×
171

172
        # Obscure configuration and transform if needed
173
        obscured_config = await self.obscure_config(configuration)
1✔
174

175
        with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") as f:
1✔
176
            config = "\n".join(f"{k}={v}" for k, v in obscured_config.items())
1✔
177
            f.write(f"[temp]\n{config}")
1✔
178
            f.close()
1✔
179
            proc = await asyncio.create_subprocess_exec(
1✔
180
                "rclone",
181
                "backend",
182
                "metadata",
183
                "--config",
184
                f.name,
185
                "temp:",
186
                stdout=asyncio.subprocess.PIPE,
187
                stderr=asyncio.subprocess.PIPE,
188
            )
189
            stdout, stderr = await proc.communicate()
1✔
190
            success = proc.returncode == 0
1✔
191
        if success:
1✔
192
            metadata = RCloneDOIMetadata.model_validate_json(stdout.decode().strip())
1✔
193
            return metadata
1✔
194
        raise errors.ValidationError(
1✔
195
            message=f"Could not resolve DOI {configuration.get("doi", "<unknown>")} or the hosting platform is not supported",  # noqa E501
196
            detail=f"Reason: {stderr.decode().strip()}",
197
        )
198

199
    @staticmethod
2✔
200
    def transform_polybox_switchdriver_config(
2✔
201
        configuration: Union["RCloneConfig", dict[str, Any]],
202
    ) -> Union["RCloneConfig", dict[str, Any]]:
203
        """Transform the configuration for public access."""
204
        storage_type = configuration.get("type")
1✔
205

206
        # Only process Polybox or SwitchDrive configurations
207
        if storage_type not in {"polybox", "switchDrive"}:
1✔
208
            return configuration
1✔
209

210
        configuration["type"] = "webdav"
×
211

212
        provider = configuration.get("provider")
×
213

214
        if provider == "personal":
×
215
            configuration["url"] = configuration.get("url") or (
×
216
                "https://polybox.ethz.ch/remote.php/webdav/"
217
                if storage_type == "polybox"
218
                else "https://drive.switch.ch/remote.php/webdav/"
219
            )
220
            return configuration
×
221

222
        ## Set url and username when is a shared configuration
223
        configuration["url"] = (
×
224
            "https://polybox.ethz.ch/public.php/webdav/"
225
            if storage_type == "polybox"
226
            else "https://drive.switch.ch/public.php/webdav/"
227
        )
228
        public_link = configuration.get("public_link")
×
229

230
        if not public_link:
×
231
            raise ValueError("Missing 'public_link' for public access configuration.")
×
232

233
        # Extract the user from the public link
234
        configuration["user"] = public_link.split("/")[-1]
×
235

236
        return configuration
×
237

238

239
class RCloneTriState(BaseModel):
2✔
240
    """Represents a Tristate of true|false|unset."""
241

242
    value: bool = Field(validation_alias="Value")
2✔
243
    valid: bool = Field(validation_alias="Valid")
2✔
244

245

246
class RCloneExample(BaseModel):
2✔
247
    """Example value for an RClone option.
248

249
    RClone calls this example, but it really is an enum. If `exclusive` is `true`, only values specified here can
250
    be used, potentially further filtered by `provider` if a provider is selected.
251
    """
252

253
    value: str = Field(validation_alias="Value")
2✔
254
    help: str = Field(validation_alias="Help")
2✔
255
    provider: str | None = Field(validation_alias="Provider", default=None)
2✔
256

257

258
class RCloneOption(BaseModel):
2✔
259
    """Option for an RClone provider."""
260

261
    name: str = Field(validation_alias="Name")
2✔
262
    help: str = Field(validation_alias="Help")
2✔
263
    provider: str | None = Field(validation_alias="Provider", default=None)
2✔
264
    default: str | int | bool | list[str] | RCloneTriState | None = Field(validation_alias="Default")
2✔
265
    value: str | int | bool | RCloneTriState | None = Field(validation_alias="Value")
2✔
266
    examples: list[RCloneExample] | None = Field(default=None, validation_alias="Examples")
2✔
267
    short_opt: str | None = Field(validation_alias="ShortOpt", default=None)
2✔
268
    hide: int = Field(validation_alias="Hide")
2✔
269
    required: bool = Field(validation_alias="Required")
2✔
270
    is_password: bool = Field(validation_alias="IsPassword", serialization_alias="ispassword")
2✔
271
    no_prefix: bool = Field(validation_alias="NoPrefix")
2✔
272
    advanced: bool = Field(validation_alias="Advanced")
2✔
273
    exclusive: bool = Field(validation_alias="Exclusive")
2✔
274
    sensitive: bool = Field(validation_alias="Sensitive")
2✔
275
    default_str: str = Field(validation_alias="DefaultStr")
2✔
276
    value_str: str = Field(validation_alias="ValueStr")
2✔
277
    type: str = Field(validation_alias="Type")
2✔
278

279
    @property
2✔
280
    def is_sensitive(self) -> bool:
2✔
281
        """Whether this options is sensitive (e.g. credentials) or not."""
282
        return self.sensitive or self.is_password
1✔
283

284
    def matches_provider(self, provider: str | None) -> bool:
2✔
285
        """Check if this option applies for a provider.
286

287
        Note:
288
            The field can contain multiple providers separated by comma and can be preceded by a '!'
289
            which flips the matching logic.
290
        """
291
        if self.provider is None or self.provider == "":
1✔
292
            return True
1✔
293

294
        match_type = True
1✔
295
        provider_check = [self.provider]
1✔
296
        if provider_check[0].startswith("!"):
1✔
297
            match_type = False
1✔
298
            provider_check = [provider_check[0].lstrip("!")]
1✔
299
        if "," in provider_check[0]:
1✔
300
            provider_check = provider_check[0].split(",")
1✔
301

302
        return (provider in provider_check) == match_type
1✔
303

304
    def validate_config(
2✔
305
        self, value: Any, provider: str | None, keep_sensitive: bool = False
306
    ) -> int | bool | dict | str:
307
        """Validate an RClone option.
308

309
        Sensitive values are replaced with '<sensitive>' placeholders that clients are expected to handle.
310
        The placeholders indicate that a value should be there without storing the value.
311
        """
312
        if not keep_sensitive and self.is_sensitive:
1✔
313
            return "<sensitive>"
1✔
314
        match self.type:
1✔
315
            case "int" | "Duration" | "SizeSuffix" | "MultiEncoder":
1✔
316
                if not isinstance(value, int):
×
317
                    raise errors.ValidationError(message=f"Value '{value}' for field '{self.name}' is not of type int")
×
318
            case "bool":
1✔
319
                if not isinstance(value, bool):
×
320
                    raise errors.ValidationError(message=f"Value '{value}' for field '{self.name}' is not of type bool")
×
321
            case "Tristate":
1✔
322
                if not isinstance(value, dict):
×
323
                    raise errors.ValidationError(
×
324
                        message=f"Value '{value}' for field '{self.name}' is not of type Dict(Tristate)"
325
                    )
326
            case "string" | _:
1✔
327
                if not isinstance(value, str):
1✔
328
                    raise errors.ValidationError(
1✔
329
                        message=f"Value '{value}' for field '{self.name}' is not of type string"
330
                    )
331

332
        if (
1✔
333
            self.examples
334
            and self.exclusive
335
            and not any(e.value == str(value) and (not e.provider or e.provider == provider) for e in self.examples)
336
        ):
337
            raise errors.ValidationError(message=f"Value '{value}' is not valid for field {self.name}")
×
338
        return cast(int | bool | dict | str, value)
1✔
339

340

341
class RCloneProviderSchema(BaseModel):
2✔
342
    """Schema for an RClone provider."""
343

344
    name: str = Field(validation_alias="Name")
2✔
345
    description: str = Field(validation_alias="Description")
2✔
346
    prefix: str = Field(validation_alias="Prefix")
2✔
347
    options: list[RCloneOption] = Field(validation_alias="Options")
2✔
348
    command_help: list[dict[str, Any]] | None = Field(validation_alias="CommandHelp")
2✔
349
    aliases: list[str] | None = Field(validation_alias="Aliases")
2✔
350
    hide: bool = Field(validation_alias="Hide")
2✔
351
    metadata_info: dict[str, Any] | None = Field(validation_alias="MetadataInfo")
2✔
352

353
    @property
2✔
354
    def required_options(self) -> list[RCloneOption]:
2✔
355
        """Returns all required options for this provider."""
356
        return [o for o in self.options if o.required]
1✔
357

358
    @property
2✔
359
    def sensitive_options(self) -> list[RCloneOption]:
2✔
360
        """Returns all sensitive options for this provider."""
361
        return [o for o in self.options if o.is_sensitive]
×
362

363
    @property
2✔
364
    def password_options(self) -> list[RCloneOption]:
2✔
365
        """Returns all password options for this provider."""
366
        return [o for o in self.options if o.is_password]
1✔
367

368
    def get_option_for_provider(self, name: str, provider: str | None) -> RCloneOption | None:
2✔
369
        """Get an RClone option matching a provider."""
370
        for option in self.options:
1✔
371
            if option.name != name:
1✔
372
                continue
1✔
373
            if option.matches_provider(provider):
1✔
374
                return option
1✔
375

376
        return None
1✔
377

378
    def check_unsafe_option(self, name: str) -> None:
2✔
379
        """Check that the option is safe."""
380
        if self.prefix != "sftp":
1✔
381
            return None
1✔
382
        if name in BANNED_SFTP_OPTIONS:
1✔
383
            raise errors.ValidationError(message=f"The {name} option is not allowed.")
1✔
384
        return None
1✔
385

386
    def validate_config(
2✔
387
        self, configuration: Union["RCloneConfig", dict[str, Any]], keep_sensitive: bool = False
388
    ) -> None:
389
        """Validate an RClone config."""
390
        keys = set(configuration.keys()) - {"type"}
1✔
391
        provider: str | None = configuration.get("provider")
1✔
392

393
        missing: list[str] = []
1✔
394

395
        # remove None values to allow for deletion
396
        for key in list(keys):
1✔
397
            if configuration[key] is None:
1✔
398
                del configuration[key]
×
399
                keys.remove(key)
×
400

401
        for required in self.required_options:
1✔
402
            if required.name not in configuration and required.matches_provider(provider):
1✔
403
                missing.append(required.name)
1✔
404

405
        if missing:
1✔
406
            missing_str = "\n".join(missing)
1✔
407
            raise errors.ValidationError(message=f"The following fields are required but missing:\n{missing_str}")
1✔
408

409
        for key in keys:
1✔
410
            self.check_unsafe_option(key)
1✔
411

412
            value = configuration[key]
1✔
413

414
            option: RCloneOption | None = self.get_option_for_provider(key, provider)
1✔
415

416
            if option is None:
1✔
417
                logger.info(f"Couldn't find option '{key}' for storage '{self.name}' and provider '{provider}'")
1✔
418
                # some options don't show up in the schema, e.g. for provider 'Other' for S3.
419
                # We can't actually validate those, so we just continue
420
                continue
1✔
421

422
            configuration[key] = option.validate_config(value, provider=provider, keep_sensitive=keep_sensitive)
1✔
423

424
    def remove_sensitive_options_from_config(self, configuration: Union["RCloneConfig", dict[str, Any]]) -> None:
2✔
425
        """Remove sensitive options from configuration."""
426
        for sensitive in self.sensitive_options:
×
427
            if sensitive.name in configuration:
×
428
                del configuration[sensitive.name]
×
429

430
    async def obscure_password_options(
2✔
431
        self, configuration: Union["RCloneConfig", dict[str, Any]]
432
    ) -> Union["RCloneConfig", dict[str, Any]]:
433
        """Obscure all password options."""
434
        for passwd in self.password_options:
1✔
435
            if val := configuration.get(passwd.name):
1✔
436
                proc = await asyncio.create_subprocess_exec(
1✔
437
                    "rclone",
438
                    "obscure",
439
                    val,
440
                    stdout=asyncio.subprocess.PIPE,
441
                    stderr=asyncio.subprocess.PIPE,
442
                )
443
                result, error = await proc.communicate()
1✔
444
                success = proc.returncode == 0
1✔
445
                if not success:
1✔
446
                    raise errors.ConfigurationError(
×
447
                        message=f"Couldn't obscure password value for field '{passwd.name}'"
448
                    )
449
                configuration[passwd.name] = result.decode().strip()
1✔
450
        return configuration
1✔
451

452
    def get_private_fields(
2✔
453
        self, configuration: Union["RCloneConfig", dict[str, Any]]
454
    ) -> Generator[RCloneOption, None, None]:
455
        """Get private field descriptions for storage."""
456
        provider: str | None = configuration.get("provider")
1✔
457

458
        for option in self.options:
1✔
459
            if not option.is_sensitive:
1✔
460
                continue
1✔
461
            if not option.matches_provider(provider):
1✔
462
                continue
1✔
463
            if option.name not in configuration:
1✔
464
                continue
1✔
465
            yield option
1✔
466

467

468
class RCloneDOIMetadata(BaseModel):
2✔
469
    """Schema for metadata provided by rclone about a DOI remote."""
470

471
    doi: str = Field(alias="DOI")
2✔
472
    url: str = Field(alias="URL")
2✔
473
    metadata_url: str = Field(alias="metadataURL")
2✔
474
    provider: str = Field()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc