• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 16369691288

18 Jul 2025 11:43AM UTC coverage: 87.186% (-0.04%) from 87.229%
16369691288

Pull #929

github

web-flow
Merge 03ce98685 into 8be58eb79
Pull Request #929: exp: session api proxy

8 of 21 new or added lines in 3 files covered. (38.1%)

13 existing lines in 5 files now uncovered.

21486 of 24644 relevant lines covered (87.19%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.49
/components/renku_data_services/crc/models.py
1
"""Domain models for the application."""
2

3
from __future__ import annotations
2✔
4

5
from collections.abc import Callable
2✔
6
from copy import deepcopy
2✔
7
from dataclasses import asdict, dataclass, field
2✔
8
from enum import StrEnum
2✔
9
from typing import TYPE_CHECKING, Any, Optional, Protocol
2✔
10
from uuid import uuid4
2✔
11

12
from ulid import ULID
2✔
13

14
from renku_data_services import errors
2✔
15
from renku_data_services.errors import ValidationError
2✔
16

17
if TYPE_CHECKING:
2✔
18
    from renku_data_services.crc.apispec import Protocol as CrcApiProtocol
×
19

20

21
class ResourcesProtocol(Protocol):
2✔
22
    """Used to represent resource values present in a resource class or quota."""
23

24
    @property
2✔
25
    def cpu(self) -> float:
2✔
26
        """Cpu in fractional cores."""
27
        ...
×
28

29
    @property
2✔
30
    def gpu(self) -> int:
2✔
31
        """Number of GPUs."""
32
        ...
×
33

34
    @property
2✔
35
    def memory(self) -> int:
2✔
36
        """Memory in gigabytes."""
37
        ...
×
38

39
    @property
2✔
40
    def max_storage(self) -> Optional[int]:
2✔
41
        """Maximum allowable storage in gigabytes."""
42
        ...
×
43

44

45
class ResourcesCompareMixin:
2✔
46
    """A mixin that adds comparison operator support on ResourceClasses and Quotas."""
47

48
    def __compare(
2✔
49
        self,
50
        other: ResourcesProtocol,
51
        compare_func: Callable[[int | float, int | float], bool],
52
    ) -> bool:
53
        results = [
2✔
54
            compare_func(self.cpu, other.cpu),  # type: ignore[attr-defined]
55
            compare_func(self.memory, other.memory),  # type: ignore[attr-defined]
56
            compare_func(self.gpu, other.gpu),  # type: ignore[attr-defined]
57
        ]
58
        self_storage = getattr(self, "max_storage", 99999999999999999999999)
2✔
59
        other_storage = getattr(other, "max_storage", 99999999999999999999999)
2✔
60
        results.append(compare_func(self_storage, other_storage))
2✔
61
        return all(results)
2✔
62

63
    def __ge__(self, other: ResourcesProtocol) -> bool:
2✔
64
        return self.__compare(other, lambda x, y: x >= y)
×
65

66
    def __gt__(self, other: ResourcesProtocol) -> bool:
2✔
67
        return self.__compare(other, lambda x, y: x > y)
×
68

69
    def __lt__(self, other: ResourcesProtocol) -> bool:
2✔
70
        return self.__compare(other, lambda x, y: x < y)
×
71

72
    def __le__(self, other: ResourcesProtocol) -> bool:
2✔
73
        return self.__compare(other, lambda x, y: x <= y)
2✔
74

75

76
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
77
class NodeAffinity:
2✔
78
    """Used to set the node affinity when scheduling sessions."""
79

80
    key: str
2✔
81
    required_during_scheduling: bool = False
2✔
82

83
    @classmethod
2✔
84
    def from_dict(cls, data: dict) -> NodeAffinity:
2✔
85
        """Create a node affinity from a dictionary."""
86
        return cls(**data)
2✔
87

88

89
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
90
class ResourceClass(ResourcesCompareMixin):
2✔
91
    """Resource class model."""
92

93
    name: str
2✔
94
    cpu: float
2✔
95
    memory: int
2✔
96
    max_storage: int
2✔
97
    gpu: int
2✔
98
    id: Optional[int] = None
2✔
99
    default: bool = False
2✔
100
    default_storage: int = 1
2✔
101
    matching: Optional[bool] = None
2✔
102
    node_affinities: list[NodeAffinity] = field(default_factory=list)
2✔
103
    tolerations: list[str] = field(default_factory=list)
2✔
104
    quota: str | None = None
2✔
105

106
    def __post_init__(self) -> None:
2✔
107
        if len(self.name) > 40:
2✔
108
            raise ValidationError(message="'name' cannot be longer than 40 characters.")
1✔
109
        if self.default_storage > self.max_storage:
2✔
UNCOV
110
            raise ValidationError(message="The default storage cannot be larger than the max allowable storage.")
×
111
        # We need to sort node affinities and tolerations to make '__eq__' reliable
112
        object.__setattr__(
2✔
113
            self, "node_affinities", sorted(self.node_affinities, key=lambda x: (x.key, x.required_during_scheduling))
114
        )
115
        object.__setattr__(self, "tolerations", sorted(self.tolerations))
2✔
116

117
    @classmethod
2✔
118
    def from_dict(cls, data: dict) -> ResourceClass:
2✔
119
        """Create the model from a plain dictionary."""
120
        node_affinities: list[NodeAffinity] = []
2✔
121
        tolerations: list[str] = []
2✔
122
        quota: str | None = None
2✔
123
        if data.get("node_affinities"):
2✔
124
            node_affinities = [
2✔
125
                NodeAffinity.from_dict(affinity) if isinstance(affinity, dict) else affinity
126
                for affinity in data.get("node_affinities", [])
127
            ]
128
        if isinstance(data.get("tolerations"), list):
2✔
129
            tolerations = [toleration for toleration in data["tolerations"]]
2✔
130
        if data_quota := data.get("quota"):
2✔
131
            if isinstance(data_quota, str):
1✔
132
                quota = data_quota
1✔
133
            elif isinstance(data_quota, Quota):
×
134
                quota = data_quota.id
×
135
        return cls(**{**data, "tolerations": tolerations, "node_affinities": node_affinities, "quota": quota})
2✔
136

137
    def is_quota_valid(self, quota: Quota) -> bool:
2✔
138
        """Determine if a quota is compatible with the resource class."""
139
        return quota >= self
×
140

141
    def update(self, **kwargs: dict) -> ResourceClass:
2✔
142
        """Update a field of the resource class and return a new copy."""
143
        if not kwargs:
×
144
            return self
×
145
        return ResourceClass.from_dict({**asdict(self), **kwargs})
×
146

147

148
class GpuKind(StrEnum):
2✔
149
    """GPU kinds for k8s."""
150

151
    NVIDIA = "nvidia.com"
2✔
152
    AMD = "amd.com"
2✔
153

154

155
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
156
class Quota(ResourcesCompareMixin):
2✔
157
    """Quota model."""
158

159
    cpu: float
2✔
160
    memory: int
2✔
161
    gpu: int
2✔
162
    gpu_kind: GpuKind = GpuKind.NVIDIA
2✔
163
    id: str
2✔
164

165
    @classmethod
2✔
166
    def from_dict(cls, data: dict) -> Quota:
2✔
167
        """Create the model from a plain dictionary."""
168
        instance = deepcopy(data)
2✔
169

170
        match instance.get("gpu_kind"):
2✔
171
            case None:
2✔
172
                instance["gpu_kind"] = GpuKind.NVIDIA
2✔
173
            case GpuKind():
1✔
174
                pass
1✔
175
            case x:
×
176
                instance["gpu_kind"] = GpuKind[x]
×
177

178
        match instance.get("id"):
2✔
179
            case None:
2✔
180
                instance["id"] = str(uuid4())
1✔
181
            case "":
2✔
182
                instance["id"] = str(uuid4())
×
183

184
        return cls(**instance)
2✔
185

186
    def is_resource_class_compatible(self, rc: ResourceClass) -> bool:
2✔
187
        """Determine if a resource class is compatible with the quota."""
188
        return rc <= self
2✔
189

190

191
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
192
class ClusterPatch:
2✔
193
    """K8s Cluster settings patch."""
194

195
    name: str | None
2✔
196
    config_name: str | None
2✔
197
    session_protocol: CrcApiProtocol | None
2✔
198
    session_host: str | None
2✔
199
    session_port: int | None
2✔
200
    session_path: str | None
2✔
201
    session_ingress_annotations: dict[str, Any] | None
2✔
202
    session_tls_secret_name: str | None
2✔
203
    session_storage_class: str | None
2✔
204

205

206
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
207
class Cluster:
2✔
208
    """K8s Cluster settings."""
209

210
    name: str
2✔
211
    config_name: str
2✔
212
    session_protocol: CrcApiProtocol
2✔
213
    session_host: str
2✔
214
    session_port: int
2✔
215
    session_path: str
2✔
216
    session_ingress_annotations: dict[str, Any]
2✔
217
    session_tls_secret_name: str
2✔
218
    session_storage_class: str | None
2✔
219

220
    def to_cluster_patch(self) -> ClusterPatch:
2✔
221
        """Convert to ClusterPatch."""
222

223
        return ClusterPatch(
2✔
224
            name=self.name,
225
            config_name=self.config_name,
226
            session_protocol=self.session_protocol,
227
            session_host=self.session_host,
228
            session_port=self.session_port,
229
            session_path=self.session_path,
230
            session_ingress_annotations=self.session_ingress_annotations,
231
            session_tls_secret_name=self.session_tls_secret_name,
232
            session_storage_class=self.session_storage_class,
233
        )
234

235

236
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
237
class SavedCluster(Cluster):
2✔
238
    """K8s Cluster settings from the DB."""
239

240
    id: ULID
2✔
241

242

243
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
244
class ResourcePool:
2✔
245
    """Resource pool model."""
246

247
    name: str
2✔
248
    classes: list[ResourceClass]
2✔
249
    quota: Quota | None = None
2✔
250
    id: int | None = None
2✔
251
    idle_threshold: int | None = None
2✔
252
    hibernation_threshold: int | None = None
2✔
253
    default: bool = False
2✔
254
    public: bool = False
2✔
255
    cluster: SavedCluster | None = None
2✔
256

257
    def __post_init__(self) -> None:
2✔
258
        """Validate the resource pool after initialization."""
259
        if len(self.name) > 40:
2✔
260
            raise ValidationError(message="'name' cannot be longer than 40 characters.")
1✔
261
        if self.default and not self.public:
2✔
262
            raise ValidationError(message="The default resource pool has to be public.")
×
263
        if self.default and self.quota is not None:
2✔
264
            raise ValidationError(message="A default resource pool cannot have a quota.")
×
265
        if (self.idle_threshold and self.idle_threshold < 0) or (
2✔
266
            self.hibernation_threshold and self.hibernation_threshold < 0
267
        ):
268
            raise ValidationError(message="Idle threshold and hibernation threshold need to be larger than 0.")
×
269

270
        if self.idle_threshold == 0:
2✔
271
            object.__setattr__(self, "idle_threshold", None)
×
272
        if self.hibernation_threshold == 0:
2✔
273
            object.__setattr__(self, "hibernation_threshold", None)
×
274

275
        default_classes = []
2✔
276
        for cls in list(self.classes):
2✔
277
            if self.quota is not None and not self.quota.is_resource_class_compatible(cls):
2✔
278
                raise ValidationError(
1✔
279
                    message=f"The resource class with name {cls.name} is not compatible with the quota."
280
                )
281
            if cls.default:
2✔
282
                default_classes.append(cls)
2✔
283
        if len(default_classes) != 1:
2✔
284
            raise ValidationError(message="One default class is required in each resource pool.")
1✔
285

286
    def set_quota(self, val: Quota) -> ResourcePool:
2✔
287
        """Set the quota for a resource pool."""
288
        for cls in list(self.classes):
1✔
289
            if not val.is_resource_class_compatible(cls):
1✔
290
                raise ValidationError(
×
291
                    message=f"The resource class with name {cls.name} is not compatible with the quota."
292
                )
293
        return self.from_dict({**asdict(self), "quota": val})
1✔
294

295
    def update(self, **kwargs: Any) -> ResourcePool:
2✔
296
        """Determine if an update to a resource pool is valid and if valid create new updated resource pool."""
297
        if self.default and "default" in kwargs and not kwargs["default"]:
1✔
298
            raise ValidationError(message="A default resource pool cannot be made non-default.")
×
299
        return ResourcePool.from_dict({**asdict(self), **kwargs})
1✔
300

301
    @classmethod
2✔
302
    def from_dict(cls, data: dict) -> ResourcePool:
2✔
303
        """Create the model from a plain dictionary."""
304
        cluster: SavedCluster | None = None
2✔
305
        quota: Quota | None = None
2✔
306
        classes: list[ResourceClass] = []
2✔
307

308
        if "quota" in data and isinstance(data["quota"], dict):
2✔
309
            quota = Quota.from_dict(data["quota"])
2✔
310
        elif "quota" in data and isinstance(data["quota"], Quota):
2✔
311
            quota = data["quota"]
1✔
312

313
        if "classes" in data and isinstance(data["classes"], set):
2✔
314
            classes = [ResourceClass.from_dict(c) if isinstance(c, dict) else c for c in list(data["classes"])]
×
315
        elif "classes" in data and isinstance(data["classes"], list):
2✔
316
            classes = [ResourceClass.from_dict(c) if isinstance(c, dict) else c for c in data["classes"]]
2✔
317

318
        match tmp := data.get("cluster"):
2✔
319
            case SavedCluster():
2✔
320
                # This has to be before the dict() case, as this is also an instance of dict.
321
                cluster = tmp
1✔
322
            case dict():
2✔
323
                cluster = SavedCluster(
1✔
324
                    name=tmp["name"],
325
                    config_name=tmp["config_name"],
326
                    session_protocol=tmp["session_protocol"],
327
                    session_host=tmp["session_host"],
328
                    session_port=tmp["session_port"],
329
                    session_path=tmp["session_path"],
330
                    session_ingress_annotations=tmp["session_ingress_annotations"],
331
                    session_tls_secret_name=tmp["session_tls_secret_name"],
332
                    session_storage_class=tmp["session_storage_class"],
333
                    id=tmp["id"],
334
                )
335
            case None:
2✔
336
                cluster = None
2✔
337
            case unknown:
×
338
                raise errors.ValidationError(message=f"Got unexpected cluster data {unknown} when creating model")
×
339

340
        return cls(
2✔
341
            name=data["name"],
342
            id=data.get("id"),
343
            classes=classes,
344
            quota=quota,
345
            default=data.get("default", False),
346
            public=data.get("public", False),
347
            idle_threshold=data.get("idle_threshold"),
348
            hibernation_threshold=data.get("hibernation_threshold"),
349
            cluster=cluster,
350
        )
351

352
    def get_resource_class(self, resource_class_id: int) -> ResourceClass | None:
2✔
353
        """Find a specific resource class in the resource pool by the resource class id."""
354
        for rc in self.classes:
×
355
            if rc.id == resource_class_id:
×
356
                return rc
×
357
        return None
×
358

359
    def get_default_resource_class(self) -> ResourceClass | None:
2✔
360
        """Find the default resource class in the pool."""
361
        for rc in self.classes:
×
362
            if rc.default:
×
363
                return rc
×
364
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc