• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19171102871

07 Nov 2025 02:18PM UTC coverage: 86.819% (-0.02%) from 86.838%
19171102871

Pull #1102

github

web-flow
Merge 8d0671560 into b462e7159
Pull Request #1102: ignore: test PR for codeql action update

22849 of 26318 relevant lines covered (86.82%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.98
/components/renku_data_services/k8s/db.py
1
"""K8s watcher database and k8s wrappers."""
2

3
from __future__ import annotations
2✔
4

5
from collections.abc import AsyncIterable, Callable
2✔
6
from dataclasses import dataclass, field
2✔
7
from typing import Optional
2✔
8
from uuid import uuid4
2✔
9

10
import sqlalchemy
2✔
11
from kubernetes import client
2✔
12
from kubernetes.utils import parse_quantity
2✔
13
from sqlalchemy import Select, bindparam, select
2✔
14
from sqlalchemy.dialects.postgresql import JSONB
2✔
15
from sqlalchemy.ext.asyncio import AsyncSession
2✔
16

17
from renku_data_services.crc import models
2✔
18
from renku_data_services.errors import errors
2✔
19
from renku_data_services.k8s.client_interfaces import PriorityClassClient, ResourceQuotaClient
2✔
20
from renku_data_services.k8s.models import K8sObject, K8sObjectFilter, K8sObjectMeta
2✔
21
from renku_data_services.k8s.orm import K8sObjectORM
2✔
22

23

24
class K8sDbCache:
2✔
25
    """Caching k8s objects in postgres."""
26

27
    def __init__(self, session_maker: Callable[..., AsyncSession]) -> None:
2✔
28
        self.__session_maker = session_maker
2✔
29

30
    @staticmethod
2✔
31
    def __get_where_clauses(_filter: K8sObjectFilter) -> Select[tuple[K8sObjectORM]]:
2✔
32
        stmt = select(K8sObjectORM)
1✔
33
        if _filter.name is not None:
1✔
34
            stmt = stmt.where(K8sObjectORM.name == _filter.name)
1✔
35
        if _filter.namespace is not None:
1✔
36
            stmt = stmt.where(K8sObjectORM.namespace == _filter.namespace)
1✔
37
        if _filter.cluster is not None:
1✔
38
            stmt = stmt.where(K8sObjectORM.cluster == str(_filter.cluster))
1✔
39
        if _filter.gvk is not None:
1✔
40
            stmt = stmt.where(K8sObjectORM.kind_insensitive == _filter.gvk.kind)
1✔
41
            stmt = stmt.where(K8sObjectORM.version_insensitive == _filter.gvk.version)
1✔
42
            if _filter.gvk.group is None:
1✔
43
                stmt = stmt.where(K8sObjectORM.group.is_(None))
×
44
            else:
45
                stmt = stmt.where(K8sObjectORM.group_insensitive == _filter.gvk.group)
1✔
46
        if _filter.user_id is not None:
1✔
47
            stmt = stmt.where(K8sObjectORM.user_id == _filter.user_id)
1✔
48
        if _filter.label_selector is not None:
1✔
49
            stmt = stmt.where(
1✔
50
                # K8sObjectORM.manifest.comparator.contains({"metadata": {"labels": filter.label_selector}})
51
                sqlalchemy.text("manifest -> 'metadata' -> 'labels' @> :labels").bindparams(
52
                    bindparam("labels", _filter.label_selector, type_=JSONB)
53
                )
54
            )
55
        return stmt
1✔
56

57
    async def __get(self, meta: K8sObjectMeta, session: AsyncSession) -> K8sObjectORM | None:
2✔
58
        stmt = self.__get_where_clauses(meta.to_filter())
1✔
59
        obj_orm = await session.scalar(stmt)
1✔
60
        return obj_orm
1✔
61

62
    async def upsert(self, obj: K8sObject) -> None:
2✔
63
        """Insert or update an object in the cache."""
64
        if obj.user_id is None:
1✔
65
            raise errors.ValidationError(message="user_id is required to upsert k8s object.")
×
66
        async with self.__session_maker() as session, session.begin():
1✔
67
            obj_orm = await self.__get(obj, session)
1✔
68
            if obj_orm is not None:
1✔
69
                obj_orm.manifest = obj.manifest
1✔
70
                await session.commit()
1✔
71
                await session.flush()
1✔
72
                return
1✔
73
            obj_orm = K8sObjectORM(
1✔
74
                name=obj.name,
75
                namespace=obj.namespace or "default",
76
                group=obj.gvk.group,
77
                kind=obj.gvk.kind,
78
                version=obj.gvk.version,
79
                manifest=obj.manifest.to_dict(),
80
                cluster=obj.cluster,
81
                user_id=obj.user_id,
82
            )
83
            session.add(obj_orm)
1✔
84
            await session.commit()
1✔
85
            await session.flush()
1✔
86
            return
1✔
87

88
    async def delete(self, meta: K8sObjectMeta) -> None:
2✔
89
        """Delete an object from the cache."""
90
        async with self.__session_maker() as session, session.begin():
×
91
            obj_orm = await self.__get(meta, session)
×
92
            if obj_orm is not None:
×
93
                await session.delete(obj_orm)
×
94

95
    async def get(self, meta: K8sObjectMeta) -> K8sObject | None:
2✔
96
        """Get a single object from the cache."""
97
        async with self.__session_maker() as session, session.begin():
1✔
98
            obj_orm = await self.__get(meta, session)
1✔
99
            if obj_orm is not None:
1✔
100
                return meta.with_manifest(obj_orm.manifest)
1✔
101

102
        return None
1✔
103

104
    async def list(self, _filter: K8sObjectFilter) -> AsyncIterable[K8sObject]:
2✔
105
        """List objects from the cache."""
106
        async with self.__session_maker() as session, session.begin():
1✔
107
            stmt = self.__get_where_clauses(_filter)
1✔
108
            async for res in await session.stream_scalars(stmt):
1✔
109
                yield res.dump()
1✔
110

111

112
@dataclass
2✔
113
class QuotaRepository:
2✔
114
    """Adapter for CRUD operations on resource quotas and priority classes in k8s."""
115

116
    rq_client: ResourceQuotaClient
2✔
117
    pc_client: PriorityClassClient
2✔
118
    namespace: str = "default"
2✔
119
    _label_name: str = field(init=False, default="app")
2✔
120
    _label_value: str = field(init=False, default="renku")
2✔
121

122
    def _quota_from_manifest(self, manifest: client.V1ResourceQuota) -> models.Quota:
2✔
123
        gpu = 0
1✔
124
        gpu_kind = models.GpuKind.NVIDIA
1✔
125
        for igpu_kind in models.GpuKind:
1✔
126
            key = f"requests.{igpu_kind}/gpu"
1✔
127
            if key in manifest.spec.hard:
1✔
128
                gpu = int(manifest.spec.hard.get(key))
1✔
129
                gpu_kind = igpu_kind
1✔
130
                break
1✔
131
        memory_raw = manifest.spec.hard.get("requests.memory")
1✔
132
        if memory_raw is None:
1✔
133
            raise errors.ValidationError(
×
134
                message="Kubernetes resource quota with missing hard.requests.memory is not supported"
135
            )
136
        cpu_raw = manifest.spec.hard.get("requests.cpu")
1✔
137
        if cpu_raw is None:
1✔
138
            raise errors.ValidationError(
×
139
                message="Kubernetes resource quota with missing hard.requests.cpu is not supported"
140
            )
141
        return models.Quota(
1✔
142
            cpu=float(parse_quantity(cpu_raw)),
143
            memory=round(parse_quantity(memory_raw) / 1_000_000_000),
144
            gpu=gpu,
145
            gpu_kind=gpu_kind,
146
            id=manifest.metadata.name,
147
        )
148

149
    def _quota_to_manifest(self, quota: models.Quota) -> client.V1ResourceQuota:
2✔
150
        return client.V1ResourceQuota(
1✔
151
            metadata=client.V1ObjectMeta(labels={self._label_name: self._label_value}, name=quota.id),
152
            spec=client.V1ResourceQuotaSpec(
153
                hard={
154
                    "requests.cpu": quota.cpu,
155
                    "requests.memory": str(quota.memory * 1_000_000_000),
156
                    f"requests.{quota.gpu_kind}/gpu": quota.gpu,
157
                },
158
                scope_selector=client.V1ScopeSelector(
159
                    match_expressions=[{"operator": "In", "scopeName": "PriorityClass", "values": [quota.id]}]
160
                ),
161
            ),
162
        )
163

164
    def get_quota(self, name: str | None) -> Optional[models.Quota]:
2✔
165
        """Get a specific quota by name."""
166
        if not name:
1✔
167
            return None
1✔
168
        try:
1✔
169
            res_quota = self.rq_client.read_resource_quota(name=name, namespace=self.namespace)
1✔
170
        except client.ApiException as e:
×
171
            if e.status == 404:
×
172
                return None
×
173
            raise
×
174
        return self._quota_from_manifest(res_quota)
1✔
175

176
    def get_quotas(self, name: Optional[str] = None) -> list[models.Quota]:
2✔
177
        """Get a specific resource quota."""
178
        if name is not None:
1✔
179
            quota = self.get_quota(name)
1✔
180
            return [quota] if quota is not None else []
1✔
181
        quotas = self.rq_client.list_resource_quota(
1✔
182
            namespace=self.namespace, label_selector=f"{self._label_name}={self._label_value}"
183
        )
184
        return [self._quota_from_manifest(q) for q in quotas]
1✔
185

186
    def create_quota(self, new_quota: models.UnsavedQuota) -> models.Quota:
2✔
187
        """Create a resource quota and priority class."""
188
        quota_id = str(uuid4())
1✔
189
        quota = models.Quota(
1✔
190
            cpu=new_quota.cpu, memory=new_quota.memory, gpu=new_quota.gpu, gpu_kind=new_quota.gpu_kind, id=quota_id
191
        )
192
        metadata = {"labels": {self._label_name: self._label_value}, "name": quota_id}
1✔
193
        quota_manifest = self._quota_to_manifest(quota)
1✔
194

195
        # Check if we have a priority class with the given name, return it or create one otherwise.
196
        pc = self.pc_client.read_priority_class(quota.id)
1✔
197
        if pc is None:
1✔
198
            pc = self.pc_client.create_priority_class(
1✔
199
                client.V1PriorityClass(
200
                    global_default=False,
201
                    value=100,
202
                    preemption_policy="Never",
203
                    description="Renku resource quota priority class",
204
                    metadata=client.V1ObjectMeta(**metadata),
205
                ),
206
            )
207

208
        # NOTE: The priority class is cluster-scoped and a namespace-scoped resource cannot be an owner
209
        # of a cluster-scoped resource. That is why the priority class is an owner of the quota.
210
        quota_manifest.owner_references = [
1✔
211
            client.V1OwnerReference(
212
                api_version=pc.api_version,
213
                block_owner_deletion=True,
214
                controller=False,
215
                kind=pc.kind,
216
                name=pc.metadata.name,
217
                uid=pc.metadata.uid,
218
            )
219
        ]
220
        self.rq_client.create_resource_quota(self.namespace, quota_manifest)
1✔
221
        return quota
1✔
222

223
    def delete_quota(self, name: str) -> None:
2✔
224
        """Delete a resource quota and priority class."""
225
        self.pc_client.delete_priority_class(name=name, body=client.V1DeleteOptions(propagation_policy="Foreground"))
1✔
226
        self.rq_client.delete_resource_quota(name=name, namespace=self.namespace)
1✔
227

228
    def update_quota(self, quota: models.Quota) -> models.Quota:
2✔
229
        """Update a specific resource quota."""
230
        quota_manifest = self._quota_to_manifest(quota)
1✔
231
        self.rq_client.patch_resource_quota(name=quota.id, namespace=self.namespace, body=quota_manifest)
1✔
232
        return quota
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc