• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19035163548

03 Nov 2025 12:46PM UTC coverage: 86.797% (-0.02%) from 86.812%
19035163548

Pull #1094

github

web-flow
Merge 1a36ce7e6 into d7d3167bd
Pull Request #1094: feat: add user alerts

22739 of 26198 relevant lines covered (86.8%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.68
/components/renku_data_services/storage/blueprints.py
1
"""Cloud storage app."""
2

3
from dataclasses import dataclass
2✔
4
from typing import Any
2✔
5

6
from pydantic import ValidationError as PydanticValidationError
2✔
7
from sanic import HTTPResponse, Request, empty
2✔
8
from sanic.response import JSONResponse
2✔
9
from sanic_ext import validate
2✔
10
from ulid import ULID
2✔
11

12
import renku_data_services.base_models as base_models
2✔
13
from renku_data_services import errors
2✔
14
from renku_data_services.base_api.auth import authenticate
2✔
15
from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint
2✔
16
from renku_data_services.base_api.misc import validate_query
2✔
17
from renku_data_services.base_models.validation import validated_json
2✔
18
from renku_data_services.storage import apispec, models
2✔
19
from renku_data_services.storage.db import StorageRepository
2✔
20
from renku_data_services.storage.rclone import RCloneValidator
2✔
21

22

23
def dump_storage_with_sensitive_fields(storage: models.CloudStorage, validator: RCloneValidator) -> dict[str, Any]:
2✔
24
    """Dump a CloudStorage model alongside sensitive fields."""
25
    try:
1✔
26
        body = apispec.CloudStorageGet.model_validate(
1✔
27
            {
28
                "storage": apispec.CloudStorageWithId.model_validate(storage).model_dump(exclude_none=True),
29
                "sensitive_fields": [
30
                    option.model_dump(exclude_none=True, by_alias=True)
31
                    for option in validator.get_private_fields(storage.configuration)
32
                ],
33
            }
34
        ).model_dump(exclude_none=True)
35
    except PydanticValidationError as err:
×
36
        parts = [".".join(str(i) for i in field["loc"]) + ": " + field["msg"] for field in err.errors()]
×
37
        message = (
×
38
            f"The server could not construct a valid response. Errors found in the following fields: {', '.join(parts)}"
39
        )
40
        raise errors.ProgrammingError(message=message) from err
×
41
    return body
1✔
42

43

44
@dataclass(kw_only=True)
2✔
45
class StorageBP(CustomBlueprint):
2✔
46
    """Handlers for manipulating storage definitions."""
47

48
    storage_repo: StorageRepository
2✔
49
    authenticator: base_models.Authenticator
2✔
50

51
    def get(self) -> BlueprintFactoryResponse:
2✔
52
        """Get cloud storage for a repository."""
53

54
        @authenticate(self.authenticator)
2✔
55
        @validate_query(query=apispec.StorageParams)
2✔
56
        async def _get(
2✔
57
            request: Request,
58
            user: base_models.APIUser,
59
            validator: RCloneValidator,
60
            query: apispec.StorageParams,
61
        ) -> JSONResponse:
62
            storage = await self.storage_repo.get_storage(user=user, project_id=query.project_id)
2✔
63

64
            return validated_json(
2✔
65
                apispec.StorageGetResponse, [dump_storage_with_sensitive_fields(s, validator) for s in storage]
66
            )
67

68
        return "/storage", ["GET"], _get
2✔
69

70
    def get_one(self) -> BlueprintFactoryResponse:
2✔
71
        """Get a single storage by id."""
72

73
        @authenticate(self.authenticator)
2✔
74
        async def _get_one(
2✔
75
            request: Request,
76
            user: base_models.APIUser,
77
            storage_id: ULID,
78
            validator: RCloneValidator,
79
        ) -> JSONResponse:
80
            storage = await self.storage_repo.get_storage_by_id(storage_id, user=user)
2✔
81

82
            return validated_json(apispec.CloudStorageGet, dump_storage_with_sensitive_fields(storage, validator))
×
83

84
        return "/storage/<storage_id:ulid>", ["GET"], _get_one
2✔
85

86
    def post(self) -> BlueprintFactoryResponse:
2✔
87
        """Create a new cloud storage entry."""
88

89
        @authenticate(self.authenticator)
2✔
90
        async def _post(request: Request, user: base_models.APIUser, validator: RCloneValidator) -> JSONResponse:
2✔
91
            storage: models.UnsavedCloudStorage
92
            if not isinstance(request.json, dict):
2✔
93
                body_type = type(request.json)
×
94
                raise errors.ValidationError(
×
95
                    message=f"The payload is supposed to be a dictionary, got {body_type.__name__}"
96
                )
97
            if "storage_url" in request.json:
2✔
98
                url_body = apispec.CloudStorageUrl(**request.json)
1✔
99
                storage = models.UnsavedCloudStorage.from_url(
1✔
100
                    storage_url=url_body.storage_url,
101
                    project_id=url_body.project_id.root,
102
                    name=url_body.name,
103
                    target_path=url_body.target_path,
104
                    readonly=url_body.readonly,
105
                )
106
            else:
107
                body = apispec.CloudStorage(**request.json)
2✔
108
                storage = models.UnsavedCloudStorage.from_dict(body.model_dump())
2✔
109

110
            validator.validate(storage.configuration.model_dump())
1✔
111

112
            res = await self.storage_repo.insert_storage(storage=storage, user=user)
1✔
113
            return validated_json(apispec.CloudStorageGet, dump_storage_with_sensitive_fields(res, validator), 201)
1✔
114

115
        return "/storage", ["POST"], _post
2✔
116

117
    def put(self) -> BlueprintFactoryResponse:
2✔
118
        """Replace a storage entry."""
119

120
        @authenticate(self.authenticator)
2✔
121
        async def _put(
2✔
122
            request: Request,
123
            user: base_models.APIUser,
124
            storage_id: ULID,
125
            validator: RCloneValidator,
126
        ) -> JSONResponse:
127
            if not request.json:
2✔
128
                raise errors.ValidationError(message="The request body is empty. Please provide a valid JSON object.")
×
129
            if not isinstance(request.json, dict):
2✔
130
                raise errors.ValidationError(message="The request body is not a valid JSON object.")
×
131
            if "storage_url" in request.json:
2✔
132
                url_body = apispec.CloudStorageUrl(**request.json)
1✔
133
                new_storage = models.UnsavedCloudStorage.from_url(
1✔
134
                    storage_url=url_body.storage_url,
135
                    project_id=url_body.project_id.root,
136
                    name=url_body.name,
137
                    target_path=url_body.target_path,
138
                    readonly=url_body.readonly,
139
                )
140
            else:
141
                body = apispec.CloudStorage(**request.json)
2✔
142
                new_storage = models.UnsavedCloudStorage.from_dict(body.model_dump())
2✔
143

144
            validator.validate(new_storage.configuration.model_dump())
1✔
145
            body_dict = new_storage.model_dump()
1✔
146
            res = await self.storage_repo.update_storage(storage_id=storage_id, user=user, **body_dict)
1✔
147
            return validated_json(apispec.CloudStorageGet, dump_storage_with_sensitive_fields(res, validator))
1✔
148

149
        return "/storage/<storage_id:ulid>", ["PUT"], _put
2✔
150

151
    def patch(self) -> BlueprintFactoryResponse:
2✔
152
        """Update parts of a storage entry."""
153

154
        @authenticate(self.authenticator)
2✔
155
        @validate(json=apispec.CloudStoragePatch)
2✔
156
        async def _patch(
2✔
157
            request: Request,
158
            user: base_models.APIUser,
159
            storage_id: ULID,
160
            body: apispec.CloudStoragePatch,
161
            validator: RCloneValidator,
162
        ) -> JSONResponse:
163
            existing_storage = await self.storage_repo.get_storage_by_id(storage_id, user=user)
2✔
164
            if body.configuration is not None:
1✔
165
                # we need to apply the patch to the existing storage to properly validate it
166
                body.configuration = {**existing_storage.configuration, **body.configuration}
1✔
167

168
                for k, v in list(body.configuration.items()):
1✔
169
                    if v is None:
1✔
170
                        # delete fields that were unset
171
                        del body.configuration[k]
1✔
172
                validator.validate(body.configuration)
1✔
173

174
            body_dict = body.model_dump(exclude_none=True)
1✔
175

176
            res = await self.storage_repo.update_storage(storage_id=storage_id, user=user, **body_dict)
1✔
177
            return validated_json(apispec.CloudStorageGet, dump_storage_with_sensitive_fields(res, validator))
1✔
178

179
        return "/storage/<storage_id:ulid>", ["PATCH"], _patch
2✔
180

181
    def delete(self) -> BlueprintFactoryResponse:
2✔
182
        """Delete a storage entry."""
183

184
        @authenticate(self.authenticator)
2✔
185
        async def _delete(request: Request, user: base_models.APIUser, storage_id: ULID) -> HTTPResponse:
2✔
186
            await self.storage_repo.delete_storage(storage_id=storage_id, user=user)
2✔
187
            return empty(204)
2✔
188

189
        return "/storage/<storage_id:ulid>", ["DELETE"], _delete
2✔
190

191

192
@dataclass(kw_only=True)
2✔
193
class StorageSchemaBP(CustomBlueprint):
2✔
194
    """Handler for getting RClone storage schema."""
195

196
    def get(self) -> BlueprintFactoryResponse:
2✔
197
        """Get cloud storage for a repository."""
198

199
        async def _get(_: Request, validator: RCloneValidator) -> JSONResponse:
2✔
200
            return validated_json(apispec.RCloneSchema, validator.asdict())
2✔
201

202
        return "/storage_schema", ["GET"], _get
2✔
203

204
    def test_connection(self) -> BlueprintFactoryResponse:
2✔
205
        """Validate an RClone config."""
206

207
        @validate(json=apispec.StorageSchemaTestConnectionPostRequest)
2✔
208
        async def _test_connection(
2✔
209
            request: Request, validator: RCloneValidator, body: apispec.StorageSchemaTestConnectionPostRequest
210
        ) -> HTTPResponse:
211
            validator.validate(body.configuration, keep_sensitive=True)
2✔
212
            result = await validator.test_connection(body.configuration, body.source_path)
1✔
213
            if not result.success:
1✔
214
                raise errors.ValidationError(message=result.error)
1✔
215
            return empty(204)
1✔
216

217
        return "/storage_schema/test_connection", ["POST"], _test_connection
2✔
218

219
    def validate(self) -> BlueprintFactoryResponse:
2✔
220
        """Validate an RClone config."""
221

222
        @validate(json=apispec.RCloneConfigValidate)
2✔
223
        async def _validate(
2✔
224
            request: Request, validator: RCloneValidator, body: apispec.RCloneConfigValidate
225
        ) -> HTTPResponse:
226
            if body.root is None:
2✔
227
                raise errors.ValidationError(message="The request body is empty. Please provide a valid JSON object.")
2✔
228
            validator.validate(body.root, keep_sensitive=True)
2✔
229
            return empty(204)
1✔
230

231
        return "/storage_schema/validate", ["POST"], _validate
2✔
232

233
    def obscure(self) -> BlueprintFactoryResponse:
2✔
234
        """Obscure values in config."""
235

236
        @validate(json=apispec.StorageSchemaObscurePostRequest)
2✔
237
        async def _obscure(
2✔
238
            request: Request, validator: RCloneValidator, body: apispec.StorageSchemaObscurePostRequest
239
        ) -> JSONResponse:
240
            config = await validator.obscure_config(body.configuration)
2✔
241
            return validated_json(apispec.RCloneConfigValidate, config)
1✔
242

243
        return "/storage_schema/obscure", ["POST"], _obscure
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc