• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 17240768421

26 Aug 2025 02:10PM UTC coverage: 87.15% (-0.03%) from 87.181%
17240768421

Pull #995

github

web-flow
Merge 4e66606ab into 86bb5f056
Pull Request #995: feat: upgrade the apispec and generate code

10 of 10 new or added lines in 1 file covered. (100.0%)

11 existing lines in 4 files now uncovered.

21751 of 24958 relevant lines covered (87.15%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/components/renku_data_services/k8s_watcher/db.py
1
"""K8s watcher database and k8s wrappers."""
2

3
from __future__ import annotations
2✔
4

5
from collections.abc import AsyncIterable, Callable
2✔
6

7
import sqlalchemy
2✔
8
from sqlalchemy import Select, bindparam, select
2✔
9
from sqlalchemy.dialects.postgresql import JSONB
2✔
10
from sqlalchemy.ext.asyncio import AsyncSession
2✔
11

12
from renku_data_services.errors import errors
2✔
13
from renku_data_services.k8s.models import K8sObject, K8sObjectFilter, K8sObjectMeta
2✔
14
from renku_data_services.k8s_watcher.orm import K8sObjectORM
2✔
15

16

17
class K8sDbCache:
2✔
18
    """Caching k8s objects in postgres."""
19

20
    def __init__(self, session_maker: Callable[..., AsyncSession]) -> None:
2✔
21
        self.__session_maker = session_maker
2✔
22

23
    @staticmethod
2✔
24
    def __get_where_clauses(_filter: K8sObjectFilter) -> Select[tuple[K8sObjectORM]]:
2✔
25
        stmt = select(K8sObjectORM)
1✔
26
        if _filter.name is not None:
1✔
27
            stmt = stmt.where(K8sObjectORM.name == _filter.name)
1✔
28
        if _filter.namespace is not None:
1✔
29
            stmt = stmt.where(K8sObjectORM.namespace == _filter.namespace)
1✔
30
        if _filter.cluster is not None:
1✔
31
            stmt = stmt.where(K8sObjectORM.cluster == str(_filter.cluster))
1✔
32
        if _filter.gvk is not None:
1✔
33
            stmt = stmt.where(K8sObjectORM.kind_insensitive == _filter.gvk.kind)
1✔
34
            stmt = stmt.where(K8sObjectORM.version_insensitive == _filter.gvk.version)
1✔
35
            if _filter.gvk.group is None:
1✔
36
                stmt = stmt.where(K8sObjectORM.group.is_(None))
×
37
            else:
38
                stmt = stmt.where(K8sObjectORM.group_insensitive == _filter.gvk.group)
1✔
39
        if _filter.user_id is not None:
1✔
40
            stmt = stmt.where(K8sObjectORM.user_id == _filter.user_id)
1✔
41
        if _filter.label_selector is not None:
1✔
42
            stmt = stmt.where(
1✔
43
                # K8sObjectORM.manifest.comparator.contains({"metadata": {"labels": filter.label_selector}})
44
                sqlalchemy.text("manifest -> 'metadata' -> 'labels' @> :labels").bindparams(
45
                    bindparam("labels", _filter.label_selector, type_=JSONB)
46
                )
47
            )
48
        return stmt
1✔
49

50
    async def __get(self, meta: K8sObjectMeta, session: AsyncSession) -> K8sObjectORM | None:
2✔
51
        stmt = self.__get_where_clauses(meta.to_filter())
1✔
52
        obj_orm = await session.scalar(stmt)
1✔
53
        return obj_orm
1✔
54

55
    async def upsert(self, obj: K8sObject) -> None:
2✔
56
        """Insert or update an object in the cache."""
57
        if obj.user_id is None:
1✔
58
            raise errors.ValidationError(message="user_id is required to upsert k8s object.")
×
59
        async with self.__session_maker() as session, session.begin():
1✔
60
            obj_orm = await self.__get(obj, session)
1✔
61
            if obj_orm is not None:
1✔
62
                obj_orm.manifest = obj.manifest
1✔
63
                await session.commit()
1✔
64
                await session.flush()
1✔
65
                return
1✔
66
            obj_orm = K8sObjectORM(
1✔
67
                name=obj.name,
68
                namespace=obj.namespace or "default",
69
                group=obj.gvk.group,
70
                kind=obj.gvk.kind,
71
                version=obj.gvk.version,
72
                manifest=obj.manifest.to_dict(),
73
                cluster=str(obj.cluster),
74
                user_id=obj.user_id,
75
            )
76
            session.add(obj_orm)
1✔
77
            await session.commit()
1✔
78
            await session.flush()
1✔
79
            return
1✔
80

81
    async def delete(self, meta: K8sObjectMeta) -> None:
2✔
82
        """Delete an object from the cache."""
UNCOV
83
        async with self.__session_maker() as session, session.begin():
×
UNCOV
84
            obj_orm = await self.__get(meta, session)
×
UNCOV
85
            if obj_orm is not None:
×
UNCOV
86
                await session.delete(obj_orm)
×
87

88
    async def get(self, meta: K8sObjectMeta) -> K8sObject | None:
2✔
89
        """Get a single object from the cache."""
90
        async with self.__session_maker() as session, session.begin():
1✔
91
            obj_orm = await self.__get(meta, session)
1✔
92
            if obj_orm is not None:
1✔
93
                return meta.with_manifest(obj_orm.manifest)
1✔
94

95
        return None
1✔
96

97
    async def list(self, _filter: K8sObjectFilter) -> AsyncIterable[K8sObject]:
2✔
98
        """List objects from the cache."""
99
        async with self.__session_maker() as session, session.begin():
1✔
100
            stmt = self.__get_where_clauses(_filter)
1✔
101
            async for res in await session.stream_scalars(stmt):
1✔
102
                yield res.dump()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc