• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 14382014257

10 Apr 2025 01:42PM UTC coverage: 86.576% (+0.2%) from 86.351%
14382014257

Pull #759

github

web-flow
Merge 470ff1568 into 74eb7d965
Pull Request #759: feat: add new service cache and migrations

412 of 486 new or added lines in 15 files covered. (84.77%)

18 existing lines in 6 files now uncovered.

20232 of 23369 relevant lines covered (86.58%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.69
/components/renku_data_services/session/k8s_client.py
1
"""An abstraction over the kr8s kubernetes client and the k8s-watcher."""
2

3
from urllib.parse import urljoin
2✔
4

5
import httpx
2✔
6
from kr8s import NotFoundError, ServerError
2✔
7
from kr8s.asyncio.objects import APIObject, Pod
2✔
8
from kubernetes.client import ApiClient
2✔
9
from sanic.log import logger
2✔
10

11
from renku_data_services import errors
2✔
12
from renku_data_services.errors.errors import CannotStartBuildError
2✔
13
from renku_data_services.notebooks.errors.intermittent import CacheError, IntermittentError
2✔
14
from renku_data_services.notebooks.errors.programming import ProgrammingError
2✔
15
from renku_data_services.notebooks.util.retries import retry_with_exponential_backoff_async
2✔
16
from renku_data_services.session import crs, models
2✔
17
from renku_data_services.session.crs import BuildRun, TaskRun
2✔
18

19

20
# NOTE The type ignore below is because the kr8s library has no type stubs, they claim pyright better handles type hints
21
class ShipwrightBuildRunV1Beta1Kr8s(APIObject):
2✔
22
    """Spec for Shipwright BuildRuns used by the k8s client."""
23

24
    kind: str = "BuildRun"
2✔
25
    version: str = "shipwright.io/v1beta1"
2✔
26
    namespaced: bool = True
2✔
27
    plural: str = "buildruns"
2✔
28
    singular: str = "buildrun"
2✔
29
    scalable: bool = False
2✔
30
    endpoint: str = "buildruns"
2✔
31

32

33
# NOTE The type ignore below is because the kr8s library has no type stubs, they claim pyright better handles type hints
34
class TektonTaskRunV1Kr8s(APIObject):
2✔
35
    """Spec for Tekton TaskRuns used by the k8s client."""
36

37
    kind: str = "TaskRun"
2✔
38
    version: str = "tekton.dev/v1"
2✔
39
    namespaced: bool = True
2✔
40
    plural: str = "taskruns"
2✔
41
    singular: str = "taskrun"
2✔
42
    scalable: bool = False
2✔
43
    endpoint: str = "taskruns"
2✔
44

45

46
class _ShipwrightClientBase:
2✔
47
    """Client for managing Shipwright resources in kubernetes.
48

49
    NOTE: This does not apply any authentication or authorization on the requests.
50
    """
51

52
    def __init__(self, namespace: str):
2✔
53
        self.namespace = namespace
×
54
        self.sanitize = ApiClient().sanitize_for_serialization
×
55

56
    async def create_build_run(self, manifest: BuildRun) -> BuildRun:
2✔
57
        """Create a new Shipwright BuildRun."""
58
        manifest.metadata.namespace = self.namespace
×
59
        build_run = await ShipwrightBuildRunV1Beta1Kr8s(manifest.model_dump(exclude_none=True, mode="json"))
×
60
        build_run_name = manifest.metadata.name
×
61
        try:
×
62
            await build_run.create()
×
63
        except ServerError as e:
×
64
            logger.exception(f"Cannot create the image build {build_run_name} because of {e}")
×
65
            raise CannotStartBuildError(message=f"Cannot create the image build {build_run_name}")
×
66
        await build_run.refresh()
×
67
        build_resource = await retry_with_exponential_backoff_async(lambda x: x is None)(self.get_build_run)(
×
68
            build_run_name
69
        )
70
        if build_resource is None:
×
71
            raise CannotStartBuildError(message=f"Cannot create the image build {build_run_name}")
×
72
        return build_resource
×
73

74
    async def get_build_run(self, name: str) -> BuildRun | None:
2✔
75
        """Get a Shipwright BuildRun."""
76
        try:
×
77
            build = await ShipwrightBuildRunV1Beta1Kr8s.get(name=name, namespace=self.namespace)
×
78
        except NotFoundError:
×
79
            return None
×
80
        except ServerError as e:
×
81
            if not e.response or e.response.status_code not in [400, 404]:
×
82
                logger.exception(f"Cannot get the build {name} because of {e}")
×
83
                raise IntermittentError(f"Cannot get build {name} from the k8s API.")
×
84
            return None
×
85
        return BuildRun.model_validate(build.to_dict())
×
86

87
    async def list_build_runs(self, label_selector: str | None = None) -> list[BuildRun]:
2✔
88
        """Get a list of Shipwright BuildRuns."""
89
        try:
×
90
            builds = await ShipwrightBuildRunV1Beta1Kr8s.list(namespace=self.namespace, label_selector=label_selector)
×
NEW
91
            output = [BuildRun.model_validate(b.to_dict()) for b in builds]
×
92
        except ServerError as e:
×
93
            if not e.response or e.response.status_code not in [400, 404]:
×
94
                logger.exception(f"Cannot list builds because of {e}")
×
95
                raise IntermittentError("Cannot list builds")
×
96
            return []
×
97
        return output
×
98

99
    async def delete_build_run(self, name: str) -> None:
2✔
100
        """Delete a Shipwright BuildRun."""
101
        build = await ShipwrightBuildRunV1Beta1Kr8s(dict(metadata=dict(name=name, namespace=self.namespace)))
×
102
        try:
×
103
            await build.delete(propagation_policy="Foreground")
×
104
        except ServerError as e:
×
105
            logger.exception(f"Cannot delete build {name} because of {e}")
×
106
        return None
×
107

108
    async def cancel_build_run(self, name: str) -> BuildRun:
2✔
109
        """Cancel a Shipwright BuildRun."""
110
        build = await ShipwrightBuildRunV1Beta1Kr8s.get(name=name, namespace=self.namespace)
×
111
        await build.patch({"spec": {"state": "BuildRunCanceled"}})
×
112
        return BuildRun.model_validate(build.to_dict())
×
113

114
    async def get_task_run(self, name: str) -> TaskRun | None:
2✔
115
        """Get a Tekton TaskRun."""
116
        try:
×
117
            task = await TektonTaskRunV1Kr8s.get(name=name, namespace=self.namespace)
×
118
        except NotFoundError:
×
119
            return None
×
120
        except ServerError as e:
×
121
            if not e.response or e.response.status_code not in [400, 404]:
×
122
                logger.exception(f"Cannot get the build {name} because of {e}")
×
123
                raise IntermittentError(f"Cannot get build {name} from the k8s API.")
×
124
            return None
×
125
        return TaskRun.model_validate(task.to_dict())
×
126

127
    async def get_pod_logs(self, name: str, max_log_lines: int | None = None) -> dict[str, str]:
2✔
128
        """Get the logs of all containers in a given pod."""
129
        pod = await Pod.get(name=name, namespace=self.namespace)
×
130
        logs: dict[str, str] = {}
×
131
        containers = [container.name for container in pod.spec.containers + pod.spec.get("initContainers", [])]
×
132
        for container in containers:
×
133
            try:
×
134
                # NOTE: calling pod.logs without a container name set crashes the library
135
                clogs: list[str] = [clog async for clog in pod.logs(container=container, tail_lines=max_log_lines)]
×
136
            except httpx.ResponseNotRead:
×
137
                # NOTE: This occurs when the container is still starting but we try to read its logs
138
                continue
×
139
            except NotFoundError:
×
140
                raise errors.MissingResourceError(message=f"The pod {name} does not exist.")
×
141
            except ServerError as err:
×
142
                if err.response is not None and err.response.status_code == 404:
×
143
                    raise errors.MissingResourceError(message=f"The pod {name} does not exist.")
×
144
                raise
×
145
            else:
146
                logs[container] = "\n".join(clogs)
×
147
        return logs
×
148

149

150
class _ShipwrightCache:
2✔
151
    """Utility class for calling the Shipwright k8s cache."""
152

153
    def __init__(self, url: str):
2✔
154
        self.url = url
×
155
        self.client = httpx.AsyncClient(timeout=10)
×
156

157
    async def list_build_runs(self) -> list[BuildRun]:
2✔
158
        """Get a list of Shipwright BuildRuns."""
159
        url = urljoin(self.url, "/buildruns")
×
160
        try:
×
161
            res = await self.client.get(url, timeout=10)
×
162
        except httpx.RequestError as err:
×
163
            logger.warning(f"Shipwright k8s cache at {url} cannot be reached: {err}")
×
164
            raise CacheError("The shipwright k8s cache is not available")
×
165
        if res.status_code != 200:
×
166
            logger.warning(
×
167
                f"Listing build runs at {url} from "
168
                f"shipwright k8s cache failed with status code: {res.status_code} "
169
                f"and body: {res.text}"
170
            )
171
            raise CacheError(f"The K8s Cache produced an unexpected status code: {res.status_code}")
×
172

173
        return [BuildRun.model_validate(server) for server in res.json()]
×
174

175
    async def get_build_run(self, name: str) -> BuildRun | None:
2✔
176
        """Get a Shipwright BuildRun."""
177
        url = urljoin(self.url, f"/buildruns/{name}")
×
178
        try:
×
179
            res = await self.client.get(url, timeout=10)
×
180
        except httpx.RequestError as err:
×
181
            logger.warning(f"Shipwright k8s cache at {url} cannot be reached: {err}")
×
182
            raise CacheError("The shipwright k8s cache is not available")
×
183
        if res.status_code != 200:
×
184
            logger.warning(
×
185
                f"Reading build run at {url} from "
186
                f"shipwright k8s cache failed with status code: {res.status_code} "
187
                f"and body: {res.text}"
188
            )
189
            raise CacheError(f"The K8s Cache produced an unexpected status code: {res.status_code}")
×
190
        output = res.json()
×
191
        if len(output) == 0:
×
192
            return None
×
193
        if len(output) > 1:
×
194
            raise ProgrammingError(
×
195
                message=f"Expected to find 1 build run when getting run {name}, found {len(output)}."
196
            )
197
        return BuildRun.model_validate(output[0])
×
198

199
    async def get_task_run(self, name: str) -> TaskRun | None:
2✔
200
        """Get a Tekton TaskRun."""
201
        url = urljoin(self.url, f"/taskruns/{name}")
×
202
        try:
×
203
            res = await self.client.get(url, timeout=10)
×
204
        except httpx.RequestError as err:
×
205
            logger.warning(f"Tekton k8s cache at {url} cannot be reached: {err}")
×
206
            raise CacheError("The tekton k8s cache is not available")
×
207
        if res.status_code != 200:
×
208
            logger.warning(
×
209
                f"Reading task run at {url} from "
210
                f"tekton k8s cache failed with status code: {res.status_code} "
211
                f"and body: {res.text}"
212
            )
213
            raise CacheError(f"The K8s Cache produced an unexpected status code: {res.status_code}")
×
214
        output = res.json()
×
215
        if len(output) == 0:
×
216
            return None
×
217
        if len(output) > 1:
×
218
            raise ProgrammingError(message=f"Expected to find 1 task run when getting run {name}, found {len(output)}.")
×
219
        return TaskRun.model_validate(output[0])
×
220

221

222
class ShipwrightClient:
2✔
223
    """The K8s client that combines a base client and a cache.
224

225
    No authentication or authorization is performed - this is the responsibility of the caller.
226
    """
227

228
    def __init__(
2✔
229
        self,
230
        namespace: str,
231
        cache_url: str,
232
        # NOTE: If cache skipping is enabled then when the cache fails a large number of
233
        # buildruns can overload the k8s API by submitting a lot of calls directly.
234
        skip_cache_if_unavailable: bool = False,
235
    ) -> None:
236
        self.cache = _ShipwrightCache(url=cache_url)
×
237
        self.base_client = _ShipwrightClientBase(namespace=namespace)
×
238
        self.skip_cache_if_unavailable = skip_cache_if_unavailable
×
239

240
    async def list_build_runs(self) -> list[BuildRun]:
2✔
241
        """Get a list of Shipwright BuildRuns."""
242
        try:
×
243
            return await self.cache.list_build_runs()
×
244
        except CacheError:
×
245
            if self.skip_cache_if_unavailable:
×
246
                logger.warning("Skipping the cache to list BuildRuns")
×
247
                return await self.base_client.list_build_runs()
×
248
            else:
249
                raise
×
250

251
    async def get_build_run(self, name: str) -> BuildRun | None:
2✔
252
        """Get a Shipwright BuildRun."""
253
        try:
×
254
            return await self.cache.get_build_run(name)
×
255
        except CacheError:
×
256
            if self.skip_cache_if_unavailable:
×
257
                return await self.base_client.get_build_run(name)
×
258
            else:
259
                raise
×
260

261
    async def create_build_run(self, manifest: BuildRun) -> BuildRun:
2✔
262
        """Create a new Shipwright BuildRun."""
263
        return await self.base_client.create_build_run(manifest)
×
264

265
    async def delete_build_run(self, name: str) -> None:
2✔
266
        """Delete a Shipwright BuildRun."""
267
        return await self.base_client.delete_build_run(name)
×
268

269
    async def cancel_build_run(self, name: str) -> BuildRun:
2✔
270
        """Cancel a Shipwright BuildRun."""
271
        return await self.base_client.cancel_build_run(name)
×
272

273
    async def get_task_run(self, name: str) -> TaskRun | None:
2✔
274
        """Get a Tekton TaskRun."""
275
        try:
×
276
            return await self.cache.get_task_run(name)
×
277
        except CacheError:
×
278
            if self.skip_cache_if_unavailable:
×
279
                return await self.base_client.get_task_run(name)
×
280
            else:
281
                raise
×
282

283
    async def create_image_build(self, params: models.ShipwrightBuildRunParams) -> None:
2✔
284
        """Create a new BuildRun in Shipwright to support a newly created build."""
285
        metadata = crs.Metadata(name=params.name)
×
286
        if params.annotations:
×
287
            metadata.annotations = params.annotations
×
288
        if params.labels:
×
289
            metadata.labels = params.labels
×
290

291
        retention: crs.Retention | None = None
×
292
        if params.retention_after_failed or params.retention_after_succeeded:
×
293
            retention_after_failed = (
×
294
                int(params.retention_after_failed.total_seconds()) if params.retention_after_failed else None
295
            )
296
            retention_after_succeeded = (
×
297
                int(params.retention_after_succeeded.total_seconds()) if params.retention_after_succeeded else None
298
            )
299
            retention = crs.Retention(
×
300
                ttlAfterFailed=f"{retention_after_failed}s" if retention_after_failed else None,
301
                ttlAfterSucceeded=f"{retention_after_succeeded}s" if retention_after_succeeded else None,
302
            )
303

304
        build_run = BuildRun(
×
305
            metadata=metadata,
306
            spec=crs.BuildRunSpec(
307
                build=crs.Build(
308
                    spec=crs.BuildSpec(
309
                        source=crs.GitSource(git=crs.Git(url=params.git_repository)),
310
                        strategy=crs.Strategy(kind="BuildStrategy", name=params.build_strategy_name),
311
                        paramValues=[crs.ParamValue(name="run-image", value=params.run_image)],
312
                        output=crs.BuildOutput(
313
                            image=params.output_image,
314
                            pushSecret=params.push_secret_name,
315
                        ),
316
                        timeout=f"{params.build_timeout.total_seconds()}s" if params.build_timeout else None,
317
                        nodeSelector=params.node_selector,
318
                        tolerations=params.tolerations,
319
                    )
320
                ),
321
                retention=retention,
322
            ),
323
        )
324
        await self.create_build_run(build_run)
×
325

326
    async def update_image_build_status(self, buildrun_name: str) -> models.ShipwrightBuildStatusUpdate:
2✔
327
        """Update the status of a build by pulling the corresponding BuildRun from Shipwright."""
328
        k8s_build = await self.get_build_run(name=buildrun_name)
×
329

330
        if k8s_build is None:
×
331
            return models.ShipwrightBuildStatusUpdate(
×
332
                update=models.ShipwrightBuildStatusUpdateContent(status=models.BuildStatus.failed)
333
            )
334

335
        k8s_build_status = k8s_build.status
×
336
        completion_time = k8s_build_status.completionTime if k8s_build_status else None
×
337

338
        if k8s_build_status is None or completion_time is None:
×
339
            return models.ShipwrightBuildStatusUpdate(update=None)
×
340

341
        conditions = k8s_build_status.conditions
×
342
        condition = next(filter(lambda c: c.type == "Succeeded", conditions or []), None)
×
343

344
        buildSpec = k8s_build_status.buildSpec
×
345
        output = buildSpec.output if buildSpec else None
×
346
        result_image = output.image if output else "unknown"
×
347

348
        source = buildSpec.source if buildSpec else None
×
349
        git_obj = source.git if source else None
×
350
        result_repository_url = git_obj.url if git_obj else "unknown"
×
351

352
        source_2 = k8s_build_status.source
×
353
        git_obj_2 = source_2.git if source_2 else None
×
354
        result_repository_git_commit_sha = git_obj_2.commitSha if git_obj_2 else None
×
355
        result_repository_git_commit_sha = result_repository_git_commit_sha or "unknown"
×
356

357
        if condition is not None and condition.status == "True":
×
358
            return models.ShipwrightBuildStatusUpdate(
×
359
                update=models.ShipwrightBuildStatusUpdateContent(
360
                    status=models.BuildStatus.succeeded,
361
                    completed_at=completion_time,
362
                    result=models.BuildResult(
363
                        completed_at=completion_time,
364
                        image=result_image,
365
                        repository_url=result_repository_url,
366
                        repository_git_commit_sha=result_repository_git_commit_sha,
367
                    ),
368
                )
369
            )
370
        else:
371
            return models.ShipwrightBuildStatusUpdate(
×
372
                update=models.ShipwrightBuildStatusUpdateContent(
373
                    status=models.BuildStatus.failed,
374
                    completed_at=completion_time,
375
                    error_reason=condition.reason if condition is not None else None,
376
                )
377
            )
378

379
    async def get_image_build_logs(self, buildrun_name: str, max_log_lines: int | None = None) -> dict[str, str]:
2✔
380
        """Get the logs from a Shipwright BuildRun."""
381
        buildrun = await self.get_build_run(name=buildrun_name)
×
382
        if not buildrun:
×
383
            raise errors.MissingResourceError(message=f"Cannot find buildrun {buildrun_name} to retrieve logs.")
×
384
        status = buildrun.status
×
385
        task_run_name = status.taskRunName if status else None
×
386
        if not task_run_name:
×
387
            raise errors.MissingResourceError(
×
388
                message=f"The buildrun {buildrun_name} has no taskrun to retrieve logs from."
389
            )
390
        taskrun = await self.get_task_run(name=task_run_name)
×
391
        if not taskrun:
×
392
            raise errors.MissingResourceError(
×
393
                message=f"Cannot find taskrun from buildrun {buildrun_name} to retrieve logs."
394
            )
395
        pod_name = taskrun.status.podName if taskrun.status else None
×
396
        if not pod_name:
×
397
            raise errors.MissingResourceError(message=f"The buildrun {buildrun_name} has no pod to retrieve logs from.")
×
398
        return await self.base_client.get_pod_logs(name=pod_name, max_log_lines=max_log_lines)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc