• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 18123243513

30 Sep 2025 08:10AM UTC coverage: 86.702% (-0.01%) from 86.714%
18123243513

Pull #1019

github

web-flow
Merge e726c4543 into 0690bab65
Pull Request #1019: feat: Attempt to support dockerhub private images

70 of 101 new or added lines in 9 files covered. (69.31%)

106 existing lines in 6 files now uncovered.

22357 of 25786 relevant lines covered (86.7%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.47
/components/renku_data_services/crc/models.py
1
"""Domain models for the application."""
2

3
from __future__ import annotations
2✔
4

5
from collections.abc import Callable
2✔
6
from copy import deepcopy
2✔
7
from dataclasses import asdict, dataclass, field
2✔
8
from enum import StrEnum
2✔
9
from typing import Any, Optional, Protocol
2✔
10
from uuid import uuid4
2✔
11

12
from renku_data_services import errors
2✔
13
from renku_data_services.errors import ValidationError
2✔
14
from renku_data_services.k8s.constants import ClusterId
2✔
15
from renku_data_services.notebooks.cr_amalthea_session import TlsSecret
2✔
16

17

18
class ResourcesProtocol(Protocol):
2✔
19
    """Used to represent resource values present in a resource class or quota."""
20

21
    @property
2✔
22
    def cpu(self) -> float:
2✔
23
        """Cpu in fractional cores."""
24
        ...
×
25

26
    @property
2✔
27
    def gpu(self) -> int:
2✔
28
        """Number of GPUs."""
29
        ...
×
30

31
    @property
2✔
32
    def memory(self) -> int:
2✔
33
        """Memory in gigabytes."""
34
        ...
×
35

36
    @property
2✔
37
    def max_storage(self) -> Optional[int]:
2✔
38
        """Maximum allowable storage in gigabytes."""
39
        ...
×
40

41

42
class ResourcesCompareMixin:
2✔
43
    """A mixin that adds comparison operator support on ResourceClasses and Quotas."""
44

45
    def __compare(
2✔
46
        self,
47
        other: ResourcesProtocol,
48
        compare_func: Callable[[int | float, int | float], bool],
49
    ) -> bool:
50
        results = [
2✔
51
            compare_func(self.cpu, other.cpu),  # type: ignore[attr-defined]
52
            compare_func(self.memory, other.memory),  # type: ignore[attr-defined]
53
            compare_func(self.gpu, other.gpu),  # type: ignore[attr-defined]
54
        ]
55
        self_storage = getattr(self, "max_storage", 99999999999999999999999)
2✔
56
        other_storage = getattr(other, "max_storage", 99999999999999999999999)
2✔
57
        results.append(compare_func(self_storage, other_storage))
2✔
58
        return all(results)
2✔
59

60
    def __ge__(self, other: ResourcesProtocol) -> bool:
2✔
61
        return self.__compare(other, lambda x, y: x >= y)
×
62

63
    def __gt__(self, other: ResourcesProtocol) -> bool:
2✔
64
        return self.__compare(other, lambda x, y: x > y)
×
65

66
    def __lt__(self, other: ResourcesProtocol) -> bool:
2✔
67
        return self.__compare(other, lambda x, y: x < y)
×
68

69
    def __le__(self, other: ResourcesProtocol) -> bool:
2✔
70
        return self.__compare(other, lambda x, y: x <= y)
2✔
71

72

73
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
74
class NodeAffinity:
2✔
75
    """Used to set the node affinity when scheduling sessions."""
76

77
    key: str
2✔
78
    required_during_scheduling: bool = False
2✔
79

80
    @classmethod
2✔
81
    def from_dict(cls, data: dict) -> NodeAffinity:
2✔
82
        """Create a node affinity from a dictionary."""
83
        return cls(**data)
2✔
84

85

86
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
87
class ResourceClass(ResourcesCompareMixin):
2✔
88
    """Resource class model."""
89

90
    name: str
2✔
91
    cpu: float
2✔
92
    memory: int
2✔
93
    max_storage: int
2✔
94
    gpu: int
2✔
95
    id: Optional[int] = None
2✔
96
    default: bool = False
2✔
97
    default_storage: int = 1
2✔
98
    matching: Optional[bool] = None
2✔
99
    node_affinities: list[NodeAffinity] = field(default_factory=list)
2✔
100
    tolerations: list[str] = field(default_factory=list)
2✔
101
    quota: str | None = None
2✔
102

103
    def __post_init__(self) -> None:
2✔
104
        if len(self.name) > 40:
2✔
105
            raise ValidationError(message="'name' cannot be longer than 40 characters.")
1✔
106
        if self.default_storage > self.max_storage:
2✔
UNCOV
107
            raise ValidationError(message="The default storage cannot be larger than the max allowable storage.")
×
108
        # We need to sort node affinities and tolerations to make '__eq__' reliable
109
        object.__setattr__(
2✔
110
            self, "node_affinities", sorted(self.node_affinities, key=lambda x: (x.key, x.required_during_scheduling))
111
        )
112
        object.__setattr__(self, "tolerations", sorted(self.tolerations))
2✔
113

114
    @classmethod
2✔
115
    def from_dict(cls, data: dict) -> ResourceClass:
2✔
116
        """Create the model from a plain dictionary."""
117
        node_affinities: list[NodeAffinity] = []
2✔
118
        tolerations: list[str] = []
2✔
119
        quota: str | None = None
2✔
120
        if data.get("node_affinities"):
2✔
121
            node_affinities = [
2✔
122
                NodeAffinity.from_dict(affinity) if isinstance(affinity, dict) else affinity
123
                for affinity in data.get("node_affinities", [])
124
            ]
125
        if isinstance(data.get("tolerations"), list):
2✔
126
            tolerations = [toleration for toleration in data["tolerations"]]
2✔
127
        if data_quota := data.get("quota"):
2✔
128
            if isinstance(data_quota, str):
1✔
129
                quota = data_quota
1✔
130
            elif isinstance(data_quota, Quota):
×
131
                quota = data_quota.id
×
132
        return cls(**{**data, "tolerations": tolerations, "node_affinities": node_affinities, "quota": quota})
2✔
133

134
    def is_quota_valid(self, quota: Quota) -> bool:
2✔
135
        """Determine if a quota is compatible with the resource class."""
136
        return quota >= self
×
137

138
    def update(self, **kwargs: dict) -> ResourceClass:
2✔
139
        """Update a field of the resource class and return a new copy."""
140
        if not kwargs:
×
141
            return self
×
142
        return ResourceClass.from_dict({**asdict(self), **kwargs})
×
143

144

145
class GpuKind(StrEnum):
2✔
146
    """GPU kinds for k8s."""
147

148
    NVIDIA = "nvidia.com"
2✔
149
    AMD = "amd.com"
2✔
150

151

152
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
153
class Quota(ResourcesCompareMixin):
2✔
154
    """Quota model."""
155

156
    cpu: float
2✔
157
    memory: int
2✔
158
    gpu: int
2✔
159
    gpu_kind: GpuKind = GpuKind.NVIDIA
2✔
160
    id: str
2✔
161

162
    @classmethod
2✔
163
    def from_dict(cls, data: dict) -> Quota:
2✔
164
        """Create the model from a plain dictionary."""
165
        instance = deepcopy(data)
2✔
166

167
        match instance.get("gpu_kind"):
2✔
168
            case None:
2✔
169
                instance["gpu_kind"] = GpuKind.NVIDIA
2✔
170
            case GpuKind():
1✔
171
                pass
1✔
172
            case x:
×
173
                instance["gpu_kind"] = GpuKind[x]
×
174

175
        match instance.get("id"):
2✔
176
            case None:
2✔
177
                instance["id"] = str(uuid4())
1✔
178
            case "":
2✔
179
                instance["id"] = str(uuid4())
×
180

181
        return cls(**instance)
2✔
182

183
    def is_resource_class_compatible(self, rc: ResourceClass) -> bool:
2✔
184
        """Determine if a resource class is compatible with the quota."""
185
        return rc <= self
2✔
186

187

188
class SessionProtocol(StrEnum):
2✔
189
    """Valid Session protocol values."""
190

191
    HTTP = "http"
2✔
192
    HTTPS = "https"
2✔
193

194

195
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
196
class ClusterPatch:
2✔
197
    """K8s Cluster settings patch."""
198

199
    name: str | None
2✔
200
    config_name: str | None
2✔
201
    session_protocol: SessionProtocol | None
2✔
202
    session_host: str | None
2✔
203
    session_port: int | None
2✔
204
    session_path: str | None
2✔
205
    session_ingress_annotations: dict[str, Any] | None
2✔
206
    session_tls_secret_name: str | None
2✔
207
    session_storage_class: str | None
2✔
208
    service_account_name: str | None
2✔
209

210

211
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
212
class ClusterSettings:
2✔
213
    """K8s Cluster settings."""
214

215
    name: str
2✔
216
    config_name: str
2✔
217
    session_protocol: SessionProtocol
2✔
218
    session_host: str
2✔
219
    session_port: int
2✔
220
    session_path: str
2✔
221
    session_ingress_annotations: dict[str, Any]
2✔
222
    session_tls_secret_name: str
2✔
223
    session_storage_class: str | None
2✔
224
    service_account_name: str | None = None
2✔
225

226
    def to_cluster_patch(self) -> ClusterPatch:
2✔
227
        """Convert to ClusterPatch."""
228

229
        return ClusterPatch(
2✔
230
            name=self.name,
231
            config_name=self.config_name,
232
            session_protocol=self.session_protocol,
233
            session_host=self.session_host,
234
            session_port=self.session_port,
235
            session_path=self.session_path,
236
            session_ingress_annotations=self.session_ingress_annotations,
237
            session_tls_secret_name=self.session_tls_secret_name,
238
            session_storage_class=self.session_storage_class,
239
            service_account_name=self.service_account_name,
240
        )
241

242

243
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
244
class SavedClusterSettings(ClusterSettings):
2✔
245
    """Saved K8s Cluster settings."""
246

247
    id: ClusterId
2✔
248

249
    def get_storage_class(self) -> str | None:
2✔
250
        """Get the default storage class for the cluster."""
251

252
        return self.session_storage_class
×
253

254
    def get_ingress_parameters(self, server_name: str) -> tuple[str, str, str, str, TlsSecret | None, dict[str, str]]:
2✔
255
        """Returns the ingress parameters of the cluster."""
256

257
        host = self.session_host
×
258
        base_server_path = f"{self.session_path}/{server_name}"
×
259
        base_server_url = f"{self.session_protocol.value}://{host}:{self.session_port}{base_server_path}"
×
260
        base_server_https_url = base_server_url
×
261
        ingress_annotations = self.session_ingress_annotations
×
262

263
        tls_secret = (
×
264
            None if self.session_tls_secret_name is None else TlsSecret(adopt=False, name=self.session_tls_secret_name)
265
        )
266

267
        return base_server_path, base_server_url, base_server_https_url, host, tls_secret, ingress_annotations
×
268

269

270
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
271
class ResourcePool:
2✔
272
    """Resource pool model."""
273

274
    name: str
2✔
275
    classes: list[ResourceClass]
2✔
276
    quota: Quota | None = None
2✔
277
    id: int | None = None
2✔
278
    idle_threshold: int | None = None
2✔
279
    hibernation_threshold: int | None = None
2✔
280
    default: bool = False
2✔
281
    public: bool = False
2✔
282
    cluster: SavedClusterSettings | None = None
2✔
283

284
    def __post_init__(self) -> None:
2✔
285
        """Validate the resource pool after initialization."""
286
        if len(self.name) > 40:
2✔
287
            raise ValidationError(message="'name' cannot be longer than 40 characters.")
1✔
288
        if self.default and not self.public:
2✔
289
            raise ValidationError(message="The default resource pool has to be public.")
×
290
        if self.default and self.quota is not None:
2✔
291
            raise ValidationError(message="A default resource pool cannot have a quota.")
×
292
        if (self.idle_threshold and self.idle_threshold < 0) or (
2✔
293
            self.hibernation_threshold and self.hibernation_threshold < 0
294
        ):
295
            raise ValidationError(message="Idle threshold and hibernation threshold need to be larger than 0.")
×
296

297
        if self.idle_threshold == 0:
2✔
298
            object.__setattr__(self, "idle_threshold", None)
×
299
        if self.hibernation_threshold == 0:
2✔
300
            object.__setattr__(self, "hibernation_threshold", None)
×
301

302
        default_classes = []
2✔
303
        for cls in list(self.classes):
2✔
304
            if self.quota is not None and not self.quota.is_resource_class_compatible(cls):
2✔
305
                raise ValidationError(
1✔
306
                    message=f"The resource class with name {cls.name} is not compatible with the quota."
307
                )
308
            if cls.default:
2✔
309
                default_classes.append(cls)
2✔
310
        if len(default_classes) != 1:
2✔
311
            raise ValidationError(message="One default class is required in each resource pool.")
1✔
312

313
    def set_quota(self, val: Quota) -> ResourcePool:
2✔
314
        """Set the quota for a resource pool."""
315
        for cls in list(self.classes):
1✔
316
            if not val.is_resource_class_compatible(cls):
1✔
317
                raise ValidationError(
×
318
                    message=f"The resource class with name {cls.name} is not compatible with the quota."
319
                )
320
        return self.from_dict({**asdict(self), "quota": val})
1✔
321

322
    def update(self, **kwargs: Any) -> ResourcePool:
2✔
323
        """Determine if an update to a resource pool is valid and if valid create new updated resource pool."""
324
        if self.default and "default" in kwargs and not kwargs["default"]:
1✔
325
            raise ValidationError(message="A default resource pool cannot be made non-default.")
×
326
        return ResourcePool.from_dict({**asdict(self), **kwargs})
1✔
327

328
    @classmethod
2✔
329
    def from_dict(cls, data: dict) -> ResourcePool:
2✔
330
        """Create the model from a plain dictionary."""
331
        cluster: SavedClusterSettings | None = None
2✔
332
        quota: Quota | None = None
2✔
333
        classes: list[ResourceClass] = []
2✔
334

335
        if "quota" in data and isinstance(data["quota"], dict):
2✔
336
            quota = Quota.from_dict(data["quota"])
2✔
337
        elif "quota" in data and isinstance(data["quota"], Quota):
2✔
338
            quota = data["quota"]
1✔
339

340
        if "classes" in data and isinstance(data["classes"], set):
2✔
341
            classes = [ResourceClass.from_dict(c) if isinstance(c, dict) else c for c in list(data["classes"])]
×
342
        elif "classes" in data and isinstance(data["classes"], list):
2✔
343
            classes = [ResourceClass.from_dict(c) if isinstance(c, dict) else c for c in data["classes"]]
2✔
344

345
        match tmp := data.get("cluster"):
2✔
346
            case SavedClusterSettings():
2✔
347
                # This has to be before the dict() case, as this is also an instance of dict.
348
                cluster = tmp
1✔
349
            case dict():
2✔
350
                cluster = SavedClusterSettings(
1✔
351
                    name=tmp["name"],
352
                    config_name=tmp["config_name"],
353
                    session_protocol=tmp["session_protocol"],
354
                    session_host=tmp["session_host"],
355
                    session_port=tmp["session_port"],
356
                    session_path=tmp["session_path"],
357
                    session_ingress_annotations=tmp["session_ingress_annotations"],
358
                    session_tls_secret_name=tmp["session_tls_secret_name"],
359
                    session_storage_class=tmp["session_storage_class"],
360
                    id=tmp["id"],
361
                    service_account_name=tmp.get("service_account_name"),
362
                )
363
            case None:
2✔
364
                cluster = None
2✔
365
            case unknown:
×
366
                raise errors.ValidationError(message=f"Got unexpected cluster data {unknown} when creating model")
×
367

368
        return cls(
2✔
369
            name=data["name"],
370
            id=data.get("id"),
371
            classes=classes,
372
            quota=quota,
373
            default=data.get("default", False),
374
            public=data.get("public", False),
375
            idle_threshold=data.get("idle_threshold"),
376
            hibernation_threshold=data.get("hibernation_threshold"),
377
            cluster=cluster,
378
        )
379

380
    def get_resource_class(self, resource_class_id: int) -> ResourceClass | None:
2✔
381
        """Find a specific resource class in the resource pool by the resource class id."""
382
        for rc in self.classes:
×
383
            if rc.id == resource_class_id:
×
384
                return rc
×
385
        return None
×
386

387
    def get_default_resource_class(self) -> ResourceClass | None:
2✔
388
        """Find the default resource class in the pool."""
389
        for rc in self.classes:
×
390
            if rc.default:
×
391
                return rc
×
392
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc