• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19181077268

07 Nov 2025 09:00PM UTC coverage: 86.352% (-0.5%) from 86.841%
19181077268

Pull #1059

github

web-flow
Merge fb47045e6 into 58a6e4765
Pull Request #1059: fix: patching of session custom resources

89 of 104 new or added lines in 5 files covered. (85.58%)

577 existing lines in 33 files now uncovered.

22791 of 26393 relevant lines covered (86.35%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.78
/components/renku_data_services/k8s/models.py
1
"""Models for the k8s watcher."""
2

3
from __future__ import annotations
2✔
4

5
from dataclasses import dataclass
2✔
6
from typing import Any, Self, cast
2✔
7

8
import kubernetes
2✔
9
from box import Box
2✔
10
from kr8s._api import Api
2✔
11
from kr8s.asyncio.objects import APIObject
2✔
12
from kr8s.objects import Secret
2✔
13
from kubernetes_asyncio.client import V1Secret
2✔
14

15
from renku_data_services.errors import errors
2✔
16
from renku_data_services.k8s.constants import DUMMY_TASK_RUN_USER_ID, ClusterId
2✔
17

18
sanitizer = kubernetes.client.ApiClient().sanitize_for_serialization
2✔
19

20

21
class K8sObjectMeta:
2✔
22
    """Metadata about a k8s object."""
23

24
    def __init__(
2✔
25
        self,
26
        name: str,
27
        namespace: str,
28
        cluster: ClusterId,
29
        gvk: GVK,
30
        user_id: str | None = None,
31
        namespaced: bool = True,
32
    ) -> None:
33
        self.name = name
1✔
34
        self.namespace = namespace
1✔
35
        self.cluster = cluster
1✔
36
        self.gvk = gvk
1✔
37
        self.user_id = user_id
1✔
38

39
        self.namespaced = namespaced
1✔
40

41
    def with_manifest(self, manifest: dict[str, Any]) -> K8sObject:
2✔
42
        """Convert to a full k8s object."""
43
        return K8sObject(
1✔
44
            name=self.name,
45
            namespace=self.namespace,
46
            cluster=self.cluster,
47
            gvk=self.gvk,
48
            manifest=Box(manifest),
49
            user_id=self.user_id,
50
        )
51

52
    def to_filter(self) -> K8sObjectFilter:
2✔
53
        """Convert the metadata to a filter used when listing resources."""
54
        return K8sObjectFilter(
1✔
55
            gvk=self.gvk,
56
            namespace=self.namespace,
57
            cluster=self.cluster,
58
            name=self.name,
59
            user_id=self.user_id,
60
        )
61

62
    def __repr__(self) -> str:
2✔
63
        return (
×
64
            f"{self.__class__.__name__}(name={self.name}, namespace={self.namespace}, cluster={self.cluster}, "
65
            f"gvk={self.gvk}, user_id={self.user_id})"
66
        )
67

68

69
class K8sObject(K8sObjectMeta):
2✔
70
    """Represents an object in k8s."""
71

72
    def __init__(
2✔
73
        self,
74
        name: str,
75
        namespace: str,
76
        cluster: ClusterId,
77
        gvk: GVK,
78
        manifest: Box,
79
        user_id: str | None = None,
80
        namespaced: bool = True,
81
    ) -> None:
82
        super().__init__(name, namespace, cluster, gvk, user_id, namespaced)
1✔
83
        self.manifest = manifest
1✔
84

85
    def __repr__(self) -> str:
2✔
86
        return (
×
87
            f"{self.__class__.__name__}(name={self.name}, namespace={self.namespace}, cluster={self.cluster}, "
88
            f"gvk={self.gvk}, manifest={self.manifest}, user_id={self.user_id})"
89
        )
90

91
    def to_api_object(self, api: Api) -> APIObject:
2✔
92
        """Convert a regular k8s object to an api object for kr8s."""
93

94
        _singular = self.gvk.kind.lower()
1✔
95
        _plural = f"{_singular}s"
1✔
96
        _endpoint = _plural
1✔
97

98
        class _APIObj(APIObject):
1✔
99
            kind = self.gvk.kind
1✔
100
            version = self.gvk.group_version
1✔
101
            singular = _singular
1✔
102
            plural = _plural
1✔
103
            endpoint = _endpoint
1✔
104
            namespaced = self.namespaced
1✔
105

106
        return _APIObj(resource=self.manifest, namespace=self.namespace, api=api)
1✔
107

108

109
class K8sSecret(K8sObject):
2✔
110
    """Represents a secret in k8s."""
111

112
    def __init__(
2✔
113
        self,
114
        name: str,
115
        namespace: str,
116
        cluster: ClusterId,
117
        gvk: GVK,
118
        manifest: Box,
119
        user_id: str | None = None,
120
        namespaced: bool = True,
121
    ) -> None:
122
        super().__init__(name, namespace, cluster, gvk, manifest, user_id, namespaced)
1✔
123

124
    def __repr__(self) -> str:
2✔
125
        # We hide the manifest to prevent leaking secrets
126
        return (
×
127
            f"{self.__class__.__name__}(name={self.name}, namespace={self.namespace}, cluster={self.cluster}, "
128
            f"gvk={self.gvk}, user_id={self.user_id})"
129
        )
130

131
    @classmethod
2✔
132
    def from_k8s_object(cls, k8s_object: K8sObject) -> K8sSecret:
2✔
133
        """Convert a k8s object to a K8sSecret object."""
134
        return K8sSecret(
1✔
135
            name=k8s_object.name,
136
            namespace=k8s_object.namespace,
137
            cluster=k8s_object.cluster,
138
            gvk=k8s_object.gvk,
139
            manifest=k8s_object.manifest,
140
        )
141

142
    @classmethod
2✔
143
    def from_v1_secret(cls, secret: V1Secret, cluster: ClusterConnection) -> K8sSecret:
2✔
144
        """Convert a V1Secret object to a K8sSecret object."""
145
        assert secret.metadata is not None
1✔
146

147
        return K8sSecret(
1✔
148
            name=secret.metadata.name,
149
            namespace=cluster.namespace,
150
            cluster=cluster.id,
151
            gvk=GVK(group="core", version=Secret.version, kind="Secret"),
152
            manifest=Box(sanitizer(secret)),
153
        )
154

155
    def to_v1_secret(self) -> V1Secret:
2✔
156
        """Convert a K8sSecret to a V1Secret object."""
157
        return V1Secret(
×
158
            metadata=self.manifest.metadata,
159
            data=self.manifest.get("data", {}),
160
            string_data=self.manifest.get("stringData", {}),
161
            type=self.manifest.get("type"),
162
        )
163

164
    def to_patch(self) -> list[dict[str, Any]]:
2✔
165
        """Create a rfc6902 patch that would take an existing secret and patch it to this state."""
166
        if self.manifest.get("stringData"):
×
167
            raise NotImplementedError("Patching a secret with stringData field is not implemented.")
×
168
        patch = [
×
169
            {"op": "replace", "path": "/data", "value": self.manifest.data},
170
            {"op": "replace", "path": "/type", "value": self.manifest.get("type", "Opaque")},
171
        ]
172
        if "metadata" not in self.manifest:
×
173
            return patch
×
174
        if "labels" in self.manifest.metadata:
×
175
            patch.append(
×
176
                {"op": "replace", "path": "/metadata/labels", "value": self.manifest.metadata.labels},
177
            )
178
        if "annotations" in self.manifest.metadata:
×
179
            patch.append(
×
180
                {"op": "replace", "path": "/metadata/annotations", "value": self.manifest.metadata.annotations},
181
            )
182
        if "ownerReferences" in self.manifest.metadata:
×
183
            patch.append(
×
184
                {"op": "replace", "path": "/metadata/ownerReferences", "value": self.manifest.metadata.ownerReferences},
185
            )
186
        # We never create 'finalizers' nor 'managedFields', so we do not patch them.
187
        return patch
×
188

189

190
@dataclass
2✔
191
class K8sObjectFilter:
2✔
192
    """Parameters used when filtering resources from the cache or k8s."""
193

194
    gvk: GVK
2✔
195
    name: str | None = None
2✔
196
    namespace: str | None = None
2✔
197
    cluster: ClusterId | None = None
2✔
198
    label_selector: dict[str, str] | None = None
2✔
199
    user_id: str | None = None
2✔
200

201

202
@dataclass(frozen=True, eq=True, kw_only=True)
2✔
203
class ClusterConnection:
2✔
204
    """K8s Cluster wrapper."""
205

206
    id: ClusterId
2✔
207
    namespace: str
2✔
208
    api: Api
2✔
209

210
    def with_api_object(self, obj: APIObject) -> APIObjectInCluster:
2✔
211
        """Create an API object associated with the cluster."""
212
        return APIObjectInCluster(obj, self.id)
1✔
213

214

215
@dataclass(kw_only=True, frozen=True)
2✔
216
class GVK:
2✔
217
    """The information about the group, version and kind of a K8s object."""
218

219
    kind: str
2✔
220
    version: str
2✔
221
    group: str | None = None
2✔
222

223
    @property
2✔
224
    def group_version(self) -> str:
2✔
225
        """Get the group and version joined by '/'."""
226
        if self.group == "core" or self.group is None:
2✔
227
            return self.version
1✔
228
        return f"{self.group}/{self.version}"
2✔
229

230
    @property
2✔
231
    def kr8s_kind(self) -> str:
2✔
232
        """Returns the fully qualified kind string for this filter for kr8s.
233

234
        Note: This exists because kr8s has some methods where it only allows you to specify 'kind' and then has
235
        weird logic to split that. This method is essentially the reverse of the kr8s logic so we can hand it a
236
        string it will accept.
237
        """
238
        if self.group is None:
1✔
239
            # e.g. pod/v1
240
            return f"{self.kind.lower()}/{self.version}"
1✔
241
        # e.g. buildrun.shipwright.io/v1beta1
242
        return f"{self.kind.lower()}.{self.group_version}"
1✔
243

244
    @classmethod
2✔
245
    def from_kr8s_object(cls, kr8s_obj: type[APIObject] | APIObject) -> Self:
2✔
246
        """Extract GVK from a kr8s object."""
247
        if "/" in kr8s_obj.version:
1✔
248
            grp_version_split = kr8s_obj.version.split("/")
1✔
249
            grp = grp_version_split[0]
1✔
250
            version = grp_version_split[1]
1✔
251
        else:
252
            grp = None
1✔
253
            version = kr8s_obj.version
1✔
254
        return cls(
1✔
255
            kind=kr8s_obj.kind,
256
            group=grp,
257
            version=version,
258
        )
259

260

261
@dataclass
2✔
262
class APIObjectInCluster:
2✔
263
    """A kr8s k8s object from a specific cluster."""
264

265
    obj: APIObject
2✔
266
    cluster: ClusterId
2✔
267

268
    @property
2✔
269
    def user_id(self) -> str | None:
2✔
270
        """Extract the user id from annotations."""
271
        labels = cast(dict[str, str], self.obj.metadata.get("labels", {}))
1✔
272
        match self.obj.singular:
1✔
273
            case "jupyterserver":
1✔
UNCOV
274
                return labels.get("renku.io/userId", None)
×
275
            case "amaltheasession":
1✔
276
                return labels.get("renku.io/safe-username", None)
1✔
277
            case "buildrun":
1✔
278
                return labels.get("renku.io/safe-username", None)
×
279
            case "taskrun":
1✔
280
                return DUMMY_TASK_RUN_USER_ID
×
281
            case _:
1✔
282
                return None
1✔
283

284
    @property
2✔
285
    def meta(self) -> K8sObjectMeta:
2✔
286
        """Extract the metadata from an api object."""
287
        return K8sObjectMeta(
1✔
288
            name=self.obj.name,
289
            namespace=self.obj.namespace or "default",
290
            cluster=self.cluster,
291
            gvk=GVK.from_kr8s_object(self.obj),
292
            user_id=self.user_id,
293
        )
294

295
    def to_k8s_object(self) -> K8sObject:
2✔
296
        """Convert the api object to a regular k8s object."""
297
        if self.obj.name is None or self.obj.namespace is None:
1✔
298
            raise errors.ProgrammingError()
×
299
        return K8sObject(
1✔
300
            name=self.obj.name,
301
            namespace=self.obj.namespace,
302
            gvk=GVK.from_kr8s_object(self.obj),
303
            manifest=Box(self.obj.to_dict()),
304
            cluster=self.cluster,
305
            user_id=self.user_id,
306
        )
307

308
    @classmethod
2✔
309
    def from_k8s_object(cls, obj: K8sObject, api: Api) -> Self:
2✔
310
        """Convert a regular k8s object to an api object."""
311

312
        return cls(
×
313
            obj=obj.to_api_object(api),
314
            cluster=obj.cluster,
315
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc