• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 20209820564

14 Dec 2025 03:03PM UTC coverage: 86.235% (-0.04%) from 86.275%
20209820564

Pull #1139

github

web-flow
Merge 175ab0b96 into 2f0371f90
Pull Request #1139: chore(deps): bump SwissDataScienceCenter/renku-actions from 1.19.1 to 1.20.0

23901 of 27716 relevant lines covered (86.24%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.08
/components/renku_data_services/crc/core.py
1
"""crc modules converters and validators."""
2

3
from typing import Literal, overload
2✔
4
from urllib.parse import urlparse
2✔
5

6
from ulid import ULID
2✔
7

8
from renku_data_services.base_models import RESET, ResetType
2✔
9
from renku_data_services.crc import apispec, models
2✔
10
from renku_data_services.errors import errors
2✔
11

12

13
def validate_quota(body: apispec.QuotaWithOptionalId) -> models.UnsavedQuota:
2✔
14
    """Validate a quota object."""
15
    return models.UnsavedQuota(
2✔
16
        cpu=body.cpu,
17
        memory=body.memory,
18
        gpu=body.gpu,
19
    )
20

21

22
def validate_quota_put_patch(body: apispec.QuotaWithId | apispec.QuotaPatch) -> models.QuotaPatch:
2✔
23
    """Validate the put request for a quota."""
24
    return models.QuotaPatch(
2✔
25
        cpu=body.cpu,
26
        memory=body.memory,
27
        gpu=body.gpu,
28
    )
29

30

31
def validate_resource_class(body: apispec.ResourceClass) -> models.UnsavedResourceClass:
2✔
32
    """Validate a resource class object."""
33
    if len(body.name) > 40:
2✔
34
        # TODO: Should this be added to the API spec instead?
35
        raise errors.ValidationError(message="'name' cannot be longer than 40 characters.")
×
36
    if body.default_storage > body.max_storage:
2✔
37
        raise errors.ValidationError(message="The default storage cannot be larger than the max allowable storage.")
1✔
38
    # We need to sort node affinities and tolerations to make '__eq__' reliable
39
    node_affinities = sorted(
2✔
40
        (
41
            models.NodeAffinity(key=na.key, required_during_scheduling=na.required_during_scheduling)
42
            for na in body.node_affinities or []
43
        ),
44
        key=lambda x: (x.key, x.required_during_scheduling),
45
    )
46
    tolerations = sorted(t.root for t in body.tolerations or [])
2✔
47
    return models.UnsavedResourceClass(
2✔
48
        name=body.name,
49
        cpu=body.cpu,
50
        memory=body.memory,
51
        max_storage=body.max_storage,
52
        gpu=body.gpu,
53
        default=body.default,
54
        default_storage=body.default_storage,
55
        node_affinities=node_affinities,
56
        tolerations=tolerations,
57
    )
58

59

60
@overload
2✔
61
def validate_resource_class_patch_or_put(
2✔
62
    body: apispec.ResourceClassPatch, method: Literal["PATCH"]
63
) -> models.ResourceClassPatch: ...
64
@overload
2✔
65
def validate_resource_class_patch_or_put(
2✔
66
    body: apispec.ResourceClassPatchWithId, method: Literal["PATCH"]
67
) -> models.ResourceClassPatchWithId: ...
68
@overload
2✔
69
def validate_resource_class_patch_or_put(
2✔
70
    body: apispec.ResourceClass, method: Literal["PUT"]
71
) -> models.ResourceClassPatch: ...
72
@overload
2✔
73
def validate_resource_class_patch_or_put(
2✔
74
    body: apispec.ResourceClassWithId, method: Literal["PUT"]
75
) -> models.ResourceClassPatchWithId: ...
76
def validate_resource_class_patch_or_put(
2✔
77
    body: apispec.ResourceClassPatch
78
    | apispec.ResourceClassPatchWithId
79
    | apispec.ResourceClass
80
    | apispec.ResourceClassWithId,
81
    method: Literal["PATCH", "PUT"],
82
) -> models.ResourceClassPatch | models.ResourceClassPatchWithId:
83
    """Validate the patch to a resource class."""
84
    rc_id = body.id if isinstance(body, (apispec.ResourceClassPatchWithId, apispec.ResourceClassWithId)) else None
2✔
85
    node_affinities: list[models.NodeAffinity] | None = [] if method == "PUT" else None
2✔
86
    if body.node_affinities:
2✔
87
        node_affinities = sorted(
2✔
88
            (
89
                models.NodeAffinity(key=na.key, required_during_scheduling=na.required_during_scheduling)
90
                for na in body.node_affinities or []
91
            ),
92
            key=lambda x: (x.key, x.required_during_scheduling),
93
        )
94
    tolerations: list[str] | None = [] if method == "PUT" else None
2✔
95
    if body.tolerations:
2✔
96
        tolerations = sorted(t.root for t in body.tolerations or [])
2✔
97
    if rc_id:
2✔
98
        return models.ResourceClassPatchWithId(
2✔
99
            id=rc_id,
100
            name=body.name,
101
            cpu=body.cpu,
102
            memory=body.memory,
103
            max_storage=body.max_storage,
104
            gpu=body.gpu,
105
            default=body.default,
106
            default_storage=body.default_storage,
107
            node_affinities=node_affinities,
108
            tolerations=tolerations,
109
        )
110
    return models.ResourceClassPatch(
2✔
111
        name=body.name,
112
        cpu=body.cpu,
113
        memory=body.memory,
114
        max_storage=body.max_storage,
115
        gpu=body.gpu,
116
        default=body.default,
117
        default_storage=body.default_storage,
118
        node_affinities=node_affinities,
119
        tolerations=tolerations,
120
    )
121

122

123
def validate_resource_class_update(
2✔
124
    existing: models.ResourceClass,
125
    update: models.ResourceClassPatch,
126
) -> None:
127
    """Validate the update to a resource class."""
128
    name = update.name if update.name is not None else existing.name
1✔
129
    max_storage = update.max_storage if update.max_storage is not None else existing.max_storage
1✔
130
    default_storage = update.default_storage if update.default_storage is not None else existing.default_storage
1✔
131

132
    if update.default is not None and existing.default != update.default:
1✔
133
        raise errors.ValidationError(message="Changing the default class in a resource pool is not supported.")
×
134

135
    if len(name) > 40:
1✔
136
        # TODO: Should this be added to the API spec instead?
137
        raise errors.ValidationError(message="'name' cannot be longer than 40 characters.")
×
138
    if default_storage > max_storage:
1✔
139
        raise errors.ValidationError(message="The default storage cannot be larger than the max allowable storage.")
×
140

141

142
def validate_resource_pool(body: apispec.ResourcePool) -> models.UnsavedResourcePool:
2✔
143
    """Validate a resource pool object."""
144
    if len(body.name) > 40:
2✔
145
        # TODO: Should this be added to the API spec instead?
146
        raise errors.ValidationError(message="'name' cannot be longer than 40 characters.")
×
147
    if body.default and not body.public:
2✔
148
        raise errors.ValidationError(message="The default resource pool has to be public.")
×
149
    if body.default and body.quota is not None:
2✔
150
        raise errors.ValidationError(message="A default resource pool cannot have a quota.")
×
151
    if body.remote and body.default:
2✔
152
        raise errors.ValidationError(message="The default resource pool cannot start remote sessions.")
×
153
    if body.remote and body.public:
2✔
154
        raise errors.ValidationError(message="A resource pool which starts remote sessions cannot be public.")
×
155
    if (body.idle_threshold and body.idle_threshold < 0) or (
2✔
156
        body.hibernation_threshold and body.hibernation_threshold < 0
157
    ):
158
        raise errors.ValidationError(message="Idle threshold and hibernation threshold need to be larger than 0.")
×
159

160
    idle_threshold = body.idle_threshold
2✔
161
    if idle_threshold == 0:
2✔
162
        idle_threshold = None
×
163
    hibernation_threshold = body.hibernation_threshold
2✔
164
    if hibernation_threshold == 0:
2✔
165
        hibernation_threshold = None
×
166

167
    quota = validate_quota(body=body.quota) if body.quota else None
2✔
168
    classes = [validate_resource_class(body=new_cls) for new_cls in body.classes]
2✔
169

170
    default_classes: list[models.UnsavedResourceClass] = []
2✔
171
    for cls in classes:
2✔
172
        if quota is not None and not quota.is_resource_class_compatible(cls):
2✔
173
            raise errors.ValidationError(
1✔
174
                message=f"The resource class with name {cls.name} is not compatible with the quota."
175
            )
176
        if cls.default:
2✔
177
            default_classes.append(cls)
1✔
178
    if len(default_classes) != 1:
2✔
179
        raise errors.ValidationError(message="One default class is required in each resource pool.")
1✔
180

181
    remote = validate_remote(body=body.remote) if body.remote else None
1✔
182
    platform = __validate_runtime_platform(body=body.platform)
1✔
183

184
    return models.UnsavedResourcePool(
1✔
185
        name=body.name,
186
        classes=classes,
187
        quota=quota,
188
        idle_threshold=idle_threshold,
189
        hibernation_threshold=hibernation_threshold,
190
        default=body.default,
191
        public=body.public,
192
        remote=remote,
193
        cluster_id=ULID.from_str(body.cluster_id) if body.cluster_id else None,
194
        platform=platform,
195
    )
196

197

198
def validate_resource_pool_patch(body: apispec.ResourcePoolPatch) -> models.ResourcePoolPatch:
2✔
199
    """Validate the patch to a resource pool."""
200
    classes = (
2✔
201
        [validate_resource_class_patch_or_put(body=rc, method="PATCH") for rc in body.classes] if body.classes else None
202
    )
203
    quota = validate_quota_put_patch(body=body.quota) if body.quota else None
2✔
204
    remote = validate_remote_patch(body=body.remote) if body.remote else None
2✔
205
    platform = __validate_runtime_platform(body=body.platform) if body.platform else None
2✔
206
    return models.ResourcePoolPatch(
2✔
207
        name=body.name,
208
        classes=classes,
209
        quota=quota,
210
        idle_threshold=RESET if body.idle_threshold == 0 else body.idle_threshold,
211
        hibernation_threshold=RESET if body.hibernation_threshold == 0 else body.hibernation_threshold,
212
        default=body.default,
213
        public=body.public,
214
        remote=remote,
215
        cluster_id=ULID.from_str(body.cluster_id) if body.cluster_id else None,
216
        platform=platform,
217
    )
218

219

220
def validate_resource_pool_put(body: apispec.ResourcePoolPut) -> models.ResourcePoolPatch:
2✔
221
    """Validate the put request for a resource pool."""
222
    # NOTE: the current behavior is to do a PATCH-style update on nested classes for "PUT resource pool"
223
    classes = (
2✔
224
        [validate_resource_class_patch_or_put(body=rc, method="PUT") for rc in body.classes] if body.classes else None
225
    )
226
    quota = validate_quota_put_patch(body=body.quota) if body.quota else RESET
2✔
227
    remote = validate_remote_put(body=body.remote)
2✔
228
    platform = __validate_runtime_platform(body=body.platform) if body.platform else RESET
2✔
229
    return models.ResourcePoolPatch(
2✔
230
        name=body.name,
231
        classes=classes,
232
        quota=quota,
233
        idle_threshold=body.idle_threshold,
234
        hibernation_threshold=body.hibernation_threshold,
235
        default=body.default,
236
        public=body.public,
237
        remote=remote,
238
        cluster_id=ULID.from_str(body.cluster_id) if body.cluster_id else RESET,
239
        platform=platform,
240
    )
241

242

243
def validate_resource_pool_update(existing: models.ResourcePool, update: models.ResourcePoolPatch) -> None:
2✔
244
    """Validate the update to a resource pool."""
245
    name = update.name if update.name is not None else existing.name
1✔
246
    classes = existing.classes
1✔
247
    for rc in update.classes or []:
1✔
248
        found = next(filter(lambda tup: tup[1].id == rc.id, enumerate(classes)), None)
1✔
249
        if found is None:
1✔
250
            raise errors.ValidationError(
×
251
                message=f"Resource class '{rc.id}' does not exist in resource pool '{existing.id}'."
252
            )
253
        idx, existing_rc = found
1✔
254
        classes[idx] = models.ResourceClass(
1✔
255
            name=rc.name if rc.name is not None else existing_rc.name,
256
            cpu=rc.cpu if rc.cpu is not None else existing_rc.cpu,
257
            memory=rc.memory if rc.memory is not None else existing_rc.memory,
258
            max_storage=rc.max_storage if rc.max_storage is not None else existing_rc.max_storage,
259
            gpu=rc.gpu if rc.gpu is not None else existing_rc.gpu,
260
            id=existing_rc.id,
261
            default=rc.default if rc.default is not None else existing_rc.default,
262
            default_storage=rc.default_storage if rc.default_storage is not None else existing_rc.default_storage,
263
            node_affinities=rc.node_affinities if rc.node_affinities is not None else existing_rc.node_affinities,
264
            tolerations=rc.tolerations if rc.tolerations is not None else existing_rc.tolerations,
265
        )
266
    quota: models.Quota | models.UnsavedQuota | ResetType = existing.quota if existing.quota else RESET
1✔
267
    if update.quota is RESET:
1✔
268
        quota = RESET
×
269
    elif update.quota is not None and existing.quota is None:
1✔
270
        # The quota patch needs to contain all required fields
271
        cpu = update.quota.cpu
×
272
        if cpu is None:
×
273
            raise errors.ValidationError(message="The 'quota.cpu' field is required when creating a new quota.")
×
274
        memory = update.quota.memory
×
275
        if memory is None:
×
276
            raise errors.ValidationError(message="The 'quota.memory' field is required when creating a new quota.")
×
277
        gpu = update.quota.gpu
×
278
        if gpu is None:
×
279
            raise errors.ValidationError(message="The 'quota.gpu' field is required when creating a new quota.")
×
280
        quota = models.UnsavedQuota(
×
281
            cpu=cpu,
282
            memory=memory,
283
            gpu=gpu,
284
        )
285
    elif isinstance(update.quota, models.QuotaPatch):
1✔
286
        assert existing.quota is not None
1✔
287
        quota = models.Quota(
1✔
288
            cpu=update.quota.cpu if update.quota.cpu is not None else existing.quota.cpu,
289
            memory=update.quota.memory if update.quota.memory is not None else existing.quota.memory,
290
            gpu=update.quota.gpu if update.quota.gpu is not None else existing.quota.gpu,
291
            gpu_kind=update.quota.gpu_kind if update.quota.gpu_kind is not None else existing.quota.gpu_kind,
292
            id=existing.quota.id,
293
        )
294
    idle_threshold = update.idle_threshold if update.idle_threshold is not None else existing.idle_threshold
1✔
295
    hibernation_threshold = (
1✔
296
        update.hibernation_threshold if update.hibernation_threshold is not None else existing.hibernation_threshold
297
    )
298
    default = update.default if update.default is not None else existing.default
1✔
299
    public = update.public if update.public is not None else existing.public
1✔
300
    remote: models.RemoteConfigurationFirecrest | ResetType = existing.remote if existing.remote else RESET
1✔
301
    if update.remote is RESET:
1✔
302
        remote = RESET
1✔
303
    elif update.remote is not None and existing.remote is None:
1✔
304
        # The remote patch needs to contain all required fields
305
        kind = update.remote.kind
1✔
306
        if kind is None:
1✔
307
            raise errors.ValidationError(message="The 'remote.kind' field is required when creating a new remote.")
×
308
        api_url = update.remote.api_url
1✔
309
        if api_url is None:
1✔
310
            raise errors.ValidationError(message="The 'remote.api_url' field is required when creating a new remote.")
×
311
        system_name = update.remote.system_name
1✔
312
        if system_name is None:
1✔
313
            raise errors.ValidationError(
×
314
                message="The 'remote.system_name' field is required when creating a new remote."
315
            )
316
        remote = models.RemoteConfigurationFirecrest(
1✔
317
            kind=kind,
318
            provider_id=update.remote.provider_id,
319
            api_url=api_url,
320
            system_name=system_name,
321
            partition=update.remote.partition,
322
        )
323
    elif isinstance(update.remote, models.RemoteConfigurationFirecrestPatch):
1✔
324
        assert existing.remote is not None
×
325
        remote = models.RemoteConfigurationFirecrest(
×
326
            kind=update.remote.kind if update.remote.kind is not None else existing.remote.kind,
327
            provider_id=update.remote.provider_id
328
            if update.remote.provider_id is not None
329
            else existing.remote.provider_id,
330
            api_url=update.remote.api_url if update.remote.api_url is not None else existing.remote.api_url,
331
            system_name=update.remote.system_name
332
            if update.remote.system_name is not None
333
            else existing.remote.system_name,
334
            partition=update.remote.partition if update.remote.partition is not None else existing.remote.partition,
335
        )
336

337
    if len(name) > 40:
1✔
338
        # TODO: Should this be added to the API spec instead?
339
        raise errors.ValidationError(message="'name' cannot be longer than 40 characters.")
×
340
    if default and not public:
1✔
341
        raise errors.ValidationError(message="The default resource pool has to be public.")
×
342
    if default and quota is not RESET:
1✔
343
        raise errors.ValidationError(message="A default resource pool cannot have a quota.")
×
344
    if isinstance(remote, models.RemoteConfigurationFirecrest) and default:
1✔
345
        raise errors.ValidationError(message="The default resource pool cannot start remote sessions.")
×
346
    if isinstance(remote, models.RemoteConfigurationFirecrest) and public:
1✔
347
        raise errors.ValidationError(message="A resource pool which starts remote sessions cannot be public.")
×
348
    if (isinstance(idle_threshold, int) and idle_threshold < 0) or (
1✔
349
        isinstance(hibernation_threshold, int) and hibernation_threshold < 0
350
    ):
351
        raise errors.ValidationError(message="Idle threshold and hibernation threshold need to be larger than 0.")
×
352

353
    default_classes: list[models.ResourceClass] = []
1✔
354
    for cls in classes:
1✔
355
        if (isinstance(quota, (models.Quota, models.UnsavedQuota))) and not quota.is_resource_class_compatible(cls):
1✔
356
            raise errors.ValidationError(
×
357
                message=f"The resource class with name {cls.name} is not compatible with the quota."
358
            )
359
        if cls.default:
1✔
360
            default_classes.append(cls)
1✔
361
    if len(default_classes) != 1:
1✔
362
        raise errors.ValidationError(message="One default class is required in each resource pool.")
×
363

364

365
def validate_cluster(body: apispec.Cluster) -> models.ClusterSettings:
2✔
366
    """Convert a REST API Cluster object to a model Cluster object."""
367
    return models.ClusterSettings(
2✔
368
        name=body.name,
369
        config_name=body.config_name,
370
        session_protocol=models.SessionProtocol(body.session_protocol.value),
371
        session_host=body.session_host,
372
        session_port=body.session_port,
373
        session_path=body.session_path,
374
        session_ingress_class_name=body.session_ingress_class_name,
375
        session_ingress_annotations=body.session_ingress_annotations.model_dump(),
376
        session_tls_secret_name=body.session_tls_secret_name,
377
        session_storage_class=body.session_storage_class,
378
        service_account_name=body.service_account_name,
379
    )
380

381

382
def validate_cluster_patch(patch: apispec.ClusterPatch) -> models.ClusterPatch:
2✔
383
    """Convert a REST API Cluster object patch to a model Cluster object."""
384

385
    return models.ClusterPatch(
2✔
386
        name=patch.name,
387
        config_name=patch.config_name,
388
        session_protocol=models.SessionProtocol(patch.session_protocol.value)
389
        if patch.session_protocol is not None
390
        else None,
391
        session_host=patch.session_host,
392
        session_port=patch.session_port,
393
        session_path=patch.session_path,
394
        session_ingress_class_name=patch.session_ingress_class_name,
395
        session_ingress_annotations=patch.session_ingress_annotations.model_dump()
396
        if patch.session_ingress_annotations is not None
397
        else None,
398
        session_tls_secret_name=patch.session_tls_secret_name,
399
        session_storage_class=patch.session_storage_class,
400
        service_account_name=patch.service_account_name,
401
    )
402

403

404
def validate_remote(body: apispec.RemoteConfigurationFirecrest) -> models.RemoteConfigurationFirecrest:
2✔
405
    """Validate a remote configuration object."""
406
    kind = models.RemoteConfigurationKind(body.kind.value)
2✔
407
    if kind != models.RemoteConfigurationKind.firecrest:
2✔
408
        raise errors.ValidationError(message=f"The kind '{kind}' of remote configuration is not supported.", quiet=True)
×
409
    validate_firecrest_api_url(body.api_url)
2✔
410
    return models.RemoteConfigurationFirecrest(
2✔
411
        kind=kind,
412
        provider_id=body.provider_id,
413
        api_url=body.api_url,
414
        system_name=body.system_name,
415
        partition=body.partition,
416
    )
417

418

419
def validate_remote_put(
2✔
420
    body: apispec.RemoteConfigurationFirecrest | None,
421
) -> models.RemoteConfigurationPatch:
422
    """Validate the PUT update to a remote configuration object."""
423
    if body is None:
2✔
424
        return RESET
2✔
425
    remote = validate_remote(body=body)
1✔
426
    return models.RemoteConfigurationFirecrestPatch(
1✔
427
        kind=remote.kind,
428
        provider_id=remote.provider_id,
429
        api_url=remote.api_url,
430
        system_name=remote.system_name,
431
        partition=remote.partition,
432
    )
433

434

435
def validate_remote_patch(
2✔
436
    body: apispec.RemoteConfigurationPatchReset | apispec.RemoteConfigurationFirecrestPatch,
437
) -> models.RemoteConfigurationPatch:
438
    """Validate the patch to a remote configuration object."""
439
    if isinstance(body, apispec.RemoteConfigurationPatchReset):
2✔
440
        return RESET
1✔
441
    kind = models.RemoteConfigurationKind(body.kind.value) if body.kind else None
2✔
442
    if kind and kind != models.RemoteConfigurationKind.firecrest:
2✔
443
        raise errors.ValidationError(message=f"The kind '{kind}' of remote configuration is not supported.", quiet=True)
×
444
    if body.api_url:
2✔
445
        validate_firecrest_api_url(body.api_url)
2✔
446
    return models.RemoteConfigurationFirecrestPatch(
2✔
447
        kind=kind,
448
        provider_id=body.provider_id,
449
        api_url=body.api_url,
450
        system_name=body.system_name,
451
        partition=body.partition,
452
    )
453

454

455
def validate_firecrest_api_url(url: str) -> None:
2✔
456
    """Validate the URL to the FirecREST API."""
457
    parsed = urlparse(url)
2✔
458
    if not parsed.netloc:
2✔
459
        raise errors.ValidationError(
1✔
460
            message=f"The host for the firecrest api url {url} is not valid, expected a non-empty value.",
461
            quiet=True,
462
        )
463
    accepted_schemes = ["https"]
2✔
464
    if parsed.scheme not in accepted_schemes:
2✔
465
        raise errors.ValidationError(
×
466
            message=f"The scheme for the firecrest api url {url} is not valid, expected one of {accepted_schemes}",
467
            quiet=True,
468
        )
469

470

471
def __validate_runtime_platform(body: apispec.RuntimePlatform | None) -> models.RuntimePlatform:
2✔
472
    """Validate the platform field for resource pools."""
473
    platform_str: str = models.RuntimePlatform.linux_amd64
2✔
474
    if body:
2✔
475
        platform_str = body.value
2✔
476
    if platform_str not in models.RuntimePlatform:
2✔
477
        raise errors.ValidationError(
×
478
            message=(
479
                f"Invalid value for the field 'platform': {body}: "
480
                f"Valid values are {[e.value for e in models.RuntimePlatform]}"
481
            )
482
        )
483
    return models.RuntimePlatform(platform_str)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc