• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 10353957042

12 Aug 2024 02:35PM UTC coverage: 90.758% (+0.4%) from 90.398%
10353957042

Pull #338

github

web-flow
Merge 3a49eb6c2 into 8afb94949
Pull Request #338: feat!: expand environments specification

227 of 237 new or added lines in 7 files covered. (95.78%)

48 existing lines in 9 files now uncovered.

9202 of 10139 relevant lines covered (90.76%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.65
/components/renku_data_services/storage/blueprints.py
1
"""Cloud storage app."""
2✔
2

3
from dataclasses import dataclass
2✔
4
from typing import Any
2✔
5

6
from sanic import HTTPResponse, Request, empty, json
2✔
7
from sanic.response import JSONResponse
2✔
8
from sanic_ext import validate
2✔
9
from ulid import ULID
2✔
10

11
import renku_data_services.base_models as base_models
2✔
12
from renku_data_services import errors
2✔
13
from renku_data_services.base_api.auth import authenticate
2✔
14
from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint
2✔
15
from renku_data_services.base_api.misc import validate_query
2✔
16
from renku_data_services.storage import apispec, models
2✔
17
from renku_data_services.storage.db import StorageRepository, StorageV2Repository
2✔
18
from renku_data_services.storage.rclone import RCloneValidator
2✔
19

20

21
def dump_storage_with_sensitive_fields(storage: models.CloudStorage, validator: RCloneValidator) -> dict[str, Any]:
2✔
22
    """Dump a CloudStorage model alongside sensitive fields."""
23
    return apispec.CloudStorageGet.model_validate(
1✔
24
        {
25
            "storage": apispec.CloudStorageWithId.model_validate(storage).model_dump(exclude_none=True),
26
            "sensitive_fields": validator.get_private_fields(storage.configuration),
27
        }
28
    ).model_dump(exclude_none=True)
29

30

31
@dataclass(kw_only=True)
2✔
32
class StorageBP(CustomBlueprint):
2✔
33
    """Handlers for manipulating storage definitions."""
2✔
34

35
    storage_repo: StorageRepository
2✔
36
    authenticator: base_models.Authenticator
2✔
37

38
    def get(self) -> BlueprintFactoryResponse:
2✔
39
        """Get cloud storage for a repository."""
40

41
        @authenticate(self.authenticator)
2✔
42
        @validate_query(query=apispec.StorageParams)
2✔
43
        async def _get(
2✔
44
            request: Request,
45
            user: base_models.APIUser,
46
            validator: RCloneValidator,
47
            query: apispec.StorageParams,
48
        ) -> JSONResponse:
49
            storage: list[models.CloudStorage]
50
            storage = await self.storage_repo.get_storage(user=user, project_id=query.project_id)
2✔
51

52
            return json([dump_storage_with_sensitive_fields(s, validator) for s in storage])
2✔
53

54
        return "/storage", ["GET"], _get
2✔
55

56
    def get_one(self) -> BlueprintFactoryResponse:
2✔
57
        """Get a single storage by id."""
58

59
        @authenticate(self.authenticator)
2✔
60
        async def _get_one(
2✔
61
            request: Request,
62
            user: base_models.APIUser,
63
            storage_id: ULID,
64
            validator: RCloneValidator,
65
        ) -> JSONResponse:
66
            storage = await self.storage_repo.get_storage_by_id(storage_id, user=user)
1✔
67

68
            return json(dump_storage_with_sensitive_fields(storage, validator))
×
69

70
        return "/storage/<storage_id:ulid>", ["GET"], _get_one
2✔
71

72
    def post(self) -> BlueprintFactoryResponse:
2✔
73
        """Create a new cloud storage entry."""
74

75
        @authenticate(self.authenticator)
2✔
76
        async def _post(request: Request, user: base_models.APIUser, validator: RCloneValidator) -> JSONResponse:
2✔
77
            storage: models.CloudStorage
78
            if not isinstance(request.json, dict):
2✔
79
                body_type = type(request.json)
1✔
80
                raise errors.ValidationError(
1✔
81
                    message=f"The payload is supposed to be a dictionary, got {body_type.__name__}"
82
                )
83
            if "storage_url" in request.json:
1✔
84
                url_body = apispec.CloudStorageUrl(**request.json)
1✔
85
                storage = models.CloudStorage.from_url(
1✔
86
                    storage_url=url_body.storage_url,
87
                    project_id=url_body.project_id.root,
88
                    name=url_body.name,
89
                    target_path=url_body.target_path,
90
                    readonly=url_body.readonly,
91
                )
92
            else:
93
                body = apispec.CloudStorage(**request.json)
1✔
94
                storage = models.CloudStorage.from_dict(body.model_dump())
1✔
95

96
            validator.validate(storage.configuration.model_dump())
1✔
97

98
            res = await self.storage_repo.insert_storage(storage=storage, user=user)
1✔
99
            return json(dump_storage_with_sensitive_fields(res, validator), 201)
1✔
100

101
        return "/storage", ["POST"], _post
2✔
102

103
    def put(self) -> BlueprintFactoryResponse:
2✔
104
        """Replace a storage entry."""
105

106
        @authenticate(self.authenticator)
2✔
107
        async def _put(
2✔
108
            request: Request,
109
            user: base_models.APIUser,
110
            storage_id: ULID,
111
            validator: RCloneValidator,
112
        ) -> JSONResponse:
113
            if not request.json:
1✔
114
                raise errors.ValidationError(message="The request body is empty. Please provide a valid JSON object.")
×
115
            if not isinstance(request.json, dict):
1✔
116
                raise errors.ValidationError(message="The request body is not a valid JSON object.")
×
117
            if "storage_url" in request.json:
1✔
118
                url_body = apispec.CloudStorageUrl(**request.json)
×
119
                new_storage = models.CloudStorage.from_url(
×
120
                    storage_url=url_body.storage_url,
121
                    project_id=url_body.project_id.root,
122
                    name=url_body.name,
123
                    target_path=url_body.target_path,
124
                    readonly=url_body.readonly,
125
                )
126
            else:
127
                body = apispec.CloudStorage(**request.json)
1✔
128
                new_storage = models.CloudStorage.from_dict(body.model_dump())
1✔
129

130
            validator.validate(new_storage.configuration.model_dump())
1✔
131
            body_dict = new_storage.model_dump()
1✔
132
            del body_dict["storage_id"]
1✔
133
            res = await self.storage_repo.update_storage(storage_id=storage_id, user=user, **body_dict)
1✔
134
            return json(dump_storage_with_sensitive_fields(res, validator))
1✔
135

136
        return "/storage/<storage_id:ulid>", ["PUT"], _put
2✔
137

138
    def patch(self) -> BlueprintFactoryResponse:
2✔
139
        """Update parts of a storage entry."""
140

141
        @authenticate(self.authenticator)
2✔
142
        @validate(json=apispec.CloudStoragePatch)
2✔
143
        async def _patch(
2✔
144
            request: Request,
145
            user: base_models.APIUser,
146
            storage_id: ULID,
147
            body: apispec.CloudStoragePatch,
148
            validator: RCloneValidator,
149
        ) -> JSONResponse:
150
            existing_storage = await self.storage_repo.get_storage_by_id(storage_id, user=user)
1✔
151
            if body.configuration is not None:
1✔
152
                # we need to apply the patch to the existing storage to properly validate it
153
                body.configuration = {**existing_storage.configuration, **body.configuration}
1✔
154

155
                for k, v in list(body.configuration.items()):
1✔
156
                    if v is None:
1✔
157
                        # delete fields that were unset
158
                        del body.configuration[k]
1✔
159
                validator.validate(body.configuration)
1✔
160

161
            body_dict = body.model_dump(exclude_none=True)
1✔
162

163
            res = await self.storage_repo.update_storage(storage_id=storage_id, user=user, **body_dict)
1✔
164
            return json(dump_storage_with_sensitive_fields(res, validator))
1✔
165

166
        return "/storage/<storage_id:ulid>", ["PATCH"], _patch
2✔
167

168
    def delete(self) -> BlueprintFactoryResponse:
2✔
169
        """Delete a storage entry."""
170

171
        @authenticate(self.authenticator)
2✔
172
        async def _delete(request: Request, user: base_models.APIUser, storage_id: ULID) -> HTTPResponse:
2✔
173
            await self.storage_repo.delete_storage(storage_id=storage_id, user=user)
1✔
174
            return empty(204)
1✔
175

176
        return "/storage/<storage_id:ulid>", ["DELETE"], _delete
2✔
177

178

179
@dataclass(kw_only=True)
2✔
180
class StoragesV2BP(CustomBlueprint):
2✔
181
    """Handlers for manipulating storage definitions."""
2✔
182

183
    storage_v2_repo: StorageV2Repository
2✔
184
    authenticator: base_models.Authenticator
2✔
185

186
    def get(self) -> BlueprintFactoryResponse:
2✔
187
        """Get cloud storage for a repository."""
188

189
        @authenticate(self.authenticator)
2✔
190
        @validate_query(query=apispec.StorageV2Params)
2✔
191
        async def _get(
2✔
192
            request: Request,
193
            user: base_models.APIUser,
194
            validator: RCloneValidator,
195
            query: apispec.StorageV2Params,
196
        ) -> JSONResponse:
197
            storage: list[models.CloudStorage]
198
            storage = await self.storage_v2_repo.get_storage(user=user, project_id=query.project_id)
1✔
199

200
            return json([dump_storage_with_sensitive_fields(s, validator) for s in storage])
1✔
201

202
        return "/storages_v2", ["GET"], _get
2✔
203

204
    def get_one(self) -> BlueprintFactoryResponse:
2✔
205
        """Get a single storage by id."""
206

207
        @authenticate(self.authenticator)
2✔
208
        async def _get_one(
2✔
209
            request: Request,
210
            user: base_models.APIUser,
211
            storage_id: ULID,
212
            validator: RCloneValidator,
213
        ) -> JSONResponse:
214
            storage = await self.storage_v2_repo.get_storage_by_id(storage_id, user=user)
1✔
215

216
            return json(dump_storage_with_sensitive_fields(storage, validator))
1✔
217

218
        return "/storages_v2/<storage_id:ulid>", ["GET"], _get_one
2✔
219

220
    def post(self) -> BlueprintFactoryResponse:
2✔
221
        """Create a new cloud storage entry."""
222

223
        @authenticate(self.authenticator)
2✔
224
        async def _post(request: Request, user: base_models.APIUser, validator: RCloneValidator) -> JSONResponse:
2✔
225
            storage: models.CloudStorage
226
            if not isinstance(request.json, dict):
2✔
227
                body_type = type(request.json)
1✔
228
                raise errors.ValidationError(
1✔
229
                    message=f"The payload is supposed to be a dictionary, got {body_type.__name__}"
230
                )
231
            if "storage_url" in request.json:
1✔
232
                url_body = apispec.CloudStorageUrl(**request.json)
×
233
                storage = models.CloudStorage.from_url(
×
234
                    storage_url=url_body.storage_url,
235
                    project_id=url_body.project_id.root,
236
                    name=url_body.name,
237
                    target_path=url_body.target_path,
238
                    readonly=url_body.readonly,
239
                )
240
            else:
241
                body = apispec.CloudStorage(**request.json)
1✔
242
                storage = models.CloudStorage.from_dict(body.model_dump())
1✔
243

244
            validator.validate(storage.configuration.model_dump())
1✔
245

246
            res = await self.storage_v2_repo.insert_storage(storage=storage, user=user)
1✔
247
            return json(dump_storage_with_sensitive_fields(res, validator), 201)
1✔
248

249
        return "/storages_v2", ["POST"], _post
2✔
250

251
    def patch(self) -> BlueprintFactoryResponse:
2✔
252
        """Update parts of a storage entry."""
253

254
        @authenticate(self.authenticator)
2✔
255
        @validate(json=apispec.CloudStoragePatch)
2✔
256
        async def _patch(
2✔
257
            request: Request,
258
            user: base_models.APIUser,
259
            storage_id: ULID,
260
            body: apispec.CloudStoragePatch,
261
            validator: RCloneValidator,
262
        ) -> JSONResponse:
263
            existing_storage = await self.storage_v2_repo.get_storage_by_id(storage_id, user=user)
1✔
264
            if body.configuration is not None:
1✔
265
                # we need to apply the patch to the existing storage to properly validate it
266
                body.configuration = {**existing_storage.configuration, **body.configuration}
1✔
267

268
                for k, v in list(body.configuration.items()):
1✔
269
                    if v is None:
1✔
270
                        # delete fields that were unset
271
                        del body.configuration[k]
1✔
272
                validator.validate(body.configuration)
1✔
273

274
            body_dict = body.model_dump(exclude_none=True)
1✔
275

276
            res = await self.storage_v2_repo.update_storage(storage_id=storage_id, user=user, **body_dict)
1✔
277
            return json(dump_storage_with_sensitive_fields(res, validator))
1✔
278

279
        return "/storages_v2/<storage_id:ulid>", ["PATCH"], _patch
2✔
280

281
    def delete(self) -> BlueprintFactoryResponse:
2✔
282
        """Delete a storage entry."""
283

284
        @authenticate(self.authenticator)
2✔
285
        async def _delete(request: Request, user: base_models.APIUser, storage_id: ULID) -> HTTPResponse:
2✔
286
            await self.storage_v2_repo.delete_storage(storage_id=storage_id, user=user)
1✔
287
            return empty(204)
1✔
288

289
        return "/storages_v2/<storage_id:ulid>", ["DELETE"], _delete
2✔
290

291

292
@dataclass(kw_only=True)
2✔
293
class StorageSchemaBP(CustomBlueprint):
2✔
294
    """Handler for getting RClone storage schema."""
2✔
295

296
    def get(self) -> BlueprintFactoryResponse:
2✔
297
        """Get cloud storage for a repository."""
298

299
        async def _get(_: Request, validator: RCloneValidator) -> JSONResponse:
2✔
300
            return json(validator.asdict())
2✔
301

302
        return "/storage_schema", ["GET"], _get
2✔
303

304
    def test_connection(self) -> BlueprintFactoryResponse:
2✔
305
        """Validate an RClone config."""
306

307
        async def _test_connection(request: Request, validator: RCloneValidator) -> HTTPResponse:
2✔
308
            if not request.json:
2✔
309
                raise errors.ValidationError(message="The request body is empty. Please provide a valid JSON object.")
1✔
310
            if not isinstance(request.json, dict):
2✔
311
                raise errors.ValidationError(message="The request body is not a valid JSON object.")
×
312
            if not request.json.get("configuration"):
2✔
313
                raise errors.ValidationError(message="No 'configuration' sent.")
1✔
314
            if not isinstance(request.json.get("configuration"), dict):
2✔
UNCOV
315
                config_type = type(request.json.get("configuration"))
×
UNCOV
316
                raise errors.ValidationError(
×
317
                    message=f"The R clone configuration should be a dictionary, not {config_type.__name__}"
318
                )
319
            if not request.json.get("source_path"):
2✔
320
                raise errors.ValidationError(message="'source_path' is required to test the connection.")
2✔
321
            validator.validate(request.json["configuration"], keep_sensitive=True)
2✔
322
            result = await validator.test_connection(request.json["configuration"], request.json["source_path"])
1✔
323
            if not result.success:
1✔
324
                raise errors.ValidationError(message=result.error)
1✔
325
            return empty(204)
1✔
326

327
        return "/storage_schema/test_connection", ["POST"], _test_connection
2✔
328

329
    def validate(self) -> BlueprintFactoryResponse:
2✔
330
        """Validate an RClone config."""
331

332
        async def _validate(request: Request, validator: RCloneValidator) -> HTTPResponse:
2✔
333
            if not request.json:
2✔
334
                raise errors.ValidationError(message="The request body is empty. Please provide a valid JSON object.")
2✔
335
            if not isinstance(request.json, dict):
2✔
336
                raise errors.ValidationError(message="The request body is not a valid JSON object.")
1✔
337
            validator.validate(request.json, keep_sensitive=True)
1✔
338
            return empty(204)
1✔
339

340
        return "/storage_schema/validate", ["POST"], _validate
2✔
341

342
    def obscure(self) -> BlueprintFactoryResponse:
2✔
343
        """Obscure values in config."""
344

345
        async def _obscure(request: Request, validator: RCloneValidator) -> JSONResponse:
2✔
346
            if not request.json:
2✔
347
                raise errors.ValidationError(message="The request body is empty. Please provide a valid JSON object.")
1✔
348
            if not isinstance(request.json, dict):
2✔
349
                raise errors.ValidationError(message="The request body is not a valid JSON object.")
1✔
350
            config = await validator.obscure_config(request.json)
2✔
351
            return json(config)
1✔
352

353
        return "/storage_schema/obscure", ["POST"], _obscure
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc