• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19171102871

07 Nov 2025 02:18PM UTC coverage: 86.819% (-0.02%) from 86.838%
19171102871

Pull #1102

github

web-flow
Merge 8d0671560 into b462e7159
Pull Request #1102: ignore: test PR for codeql action update

22849 of 26318 relevant lines covered (86.82%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.13
/components/renku_data_services/k8s/watcher/core.py
1
"""K8s watcher main."""
2

3
from __future__ import annotations
2✔
4

5
import asyncio
2✔
6
import contextlib
2✔
7
from asyncio import CancelledError, Task
2✔
8
from collections.abc import Awaitable, Callable
2✔
9
from datetime import datetime, timedelta
2✔
10

11
from renku_data_services.app_config import logging
2✔
12
from renku_data_services.base_models.core import APIUser, InternalServiceAdmin, ServiceAdminId
2✔
13
from renku_data_services.base_models.metrics import MetricsService
2✔
14
from renku_data_services.crc.db import ResourcePoolRepository
2✔
15
from renku_data_services.k8s.clients import K8sClusterClient
2✔
16
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER, ClusterId
2✔
17
from renku_data_services.k8s.db import K8sDbCache
2✔
18
from renku_data_services.k8s.models import GVK, APIObjectInCluster, K8sObject, K8sObjectFilter
2✔
19
from renku_data_services.notebooks.crs import State
2✔
20

21
logger = logging.getLogger(__name__)
2✔
22

23

24
type EventHandler = Callable[[APIObjectInCluster, str], Awaitable[None]]
2✔
25
type SyncFunc = Callable[[], Awaitable[None]]
2✔
26

27
k8s_watcher_admin_user = InternalServiceAdmin(id=ServiceAdminId.k8s_watcher)
2✔
28

29

30
class K8sWatcher:
2✔
31
    """Watch k8s events and call the handler with every event."""
32

33
    def __init__(
2✔
34
        self,
35
        handler: EventHandler,
36
        clusters: dict[ClusterId, K8sClusterClient],
37
        kinds: list[GVK],
38
        db_cache: K8sDbCache,
39
    ) -> None:
40
        self.__handler = handler
1✔
41
        self.__watch_tasks: dict[ClusterId, list[Task]] = {}
1✔
42
        self.__full_sync_tasks: dict[ClusterId, Task] = {}
1✔
43
        self.__full_sync_times: dict[ClusterId, datetime] = {}
1✔
44
        self.__full_sync_running: set[ClusterId] = set()
1✔
45
        self.__kinds = kinds
1✔
46
        self.__clusters = clusters
1✔
47
        self.__sync_period_seconds = 600
1✔
48
        self.__cache = db_cache
1✔
49

50
    async def __sync(self, client: K8sClusterClient, kind: GVK, raise_exceptions: bool = False) -> None:
2✔
51
        """Upsert K8s objects in the cache and remove deleted objects from the cache."""
52

53
        fltr = K8sObjectFilter(gvk=kind, cluster=client.get_cluster().id, namespace=client.get_cluster().namespace)
1✔
54
        # Upsert new / updated objects
55
        objects_in_k8s: dict[str, K8sObject] = {}
1✔
56
        obj_iter = aiter(client.list(fltr))
1✔
57
        while True:
1✔
58
            try:
1✔
59
                obj = await anext(obj_iter)
1✔
60
            except StopAsyncIteration:
1✔
61
                break  # No more items to list
1✔
62
            except Exception as e:
×
63
                logger.error(f"Failed to list objects: {e}")
×
64
                if raise_exceptions:
×
65
                    raise e
×
66
            else:
67
                objects_in_k8s[obj.name] = obj
1✔
68
                await self.__cache.upsert(obj)
1✔
69

70
        cache_iter = aiter(self.__cache.list(fltr))
1✔
71
        while True:
1✔
72
            try:
1✔
73
                cache_obj = await anext(cache_iter)
1✔
74
            except StopAsyncIteration:
1✔
75
                break  # No more items to list
1✔
76
            except Exception as e:
×
77
                logger.error(f"Failed to list objects: {e}")
×
78
                if raise_exceptions:
×
79
                    raise e
×
80
            else:
81
                # Remove objects that have been deleted from k8s but are still in cache
82
                if objects_in_k8s.get(cache_obj.name) is None:
1✔
83
                    await self.__cache.delete(cache_obj)
×
84

85
    async def __full_sync(self, client: K8sClusterClient) -> None:
2✔
86
        """Run the full sync if it has never run or at the required interval."""
87
        cluster_id = client.get_cluster().id
1✔
88
        last_sync = self.__full_sync_times.get(cluster_id)
1✔
89
        since_last_sync = datetime.now() - last_sync if last_sync is not None else None
1✔
90
        if since_last_sync is not None and since_last_sync.total_seconds() < self.__sync_period_seconds:
1✔
91
            return
1✔
92
        self.__full_sync_running.add(cluster_id)
1✔
93
        for kind in self.__kinds:
1✔
94
            logger.info(f"Starting full k8s cache sync for cluster {cluster_id} and kind {kind}")
1✔
95
            await self.__sync(client, kind, cluster_id == DEFAULT_K8S_CLUSTER)
1✔
96
        self.__full_sync_times[cluster_id] = datetime.now()
1✔
97
        self.__full_sync_running.remove(cluster_id)
1✔
98

99
    async def __periodic_full_sync(self, client: K8sClusterClient) -> None:
2✔
100
        """Keeps trying to run the full sync."""
101
        while True:
1✔
102
            await self.__full_sync(client)
1✔
103
            await asyncio.sleep(self.__sync_period_seconds / 10)
1✔
104

105
    async def __watch_kind(self, kind: GVK, client: K8sClusterClient) -> None:
2✔
106
        logger.info(f"Watching kind {kind} for {client}")
1✔
107
        cluster = client.get_cluster()
1✔
108
        cluster_id = cluster.id
1✔
109
        while True:
1✔
110
            try:
1✔
111
                watch = cluster.api.async_watch(kind=kind.kr8s_kind, namespace=cluster.namespace)
1✔
112
                async for event_type, obj in watch:
1✔
113
                    if cluster_id in self.__full_sync_running:
1✔
114
                        logger.info(
×
115
                            f"Pausing k8s watch event processing for cluster {cluster} until full sync completes"
116
                        )
117
                    else:
118
                        await self.__handler(cluster.with_api_object(obj), event_type)
1✔
119
            except ValueError:
1✔
120
                pass
×
121
            except Exception as e:
1✔
122
                logger.error(f"watch loop failed for {kind} in cluster {cluster_id}", exc_info=e)
1✔
123

124
            # Add a sleep to prevent retrying in a loop the same action instantly.
125
            await asyncio.sleep(10)
1✔
126

127
    def __run_single(self, client: K8sClusterClient) -> list[Task]:
2✔
128
        # The loops and error handling here will need some testing and love
129
        tasks = []
1✔
130
        for kind in self.__kinds:
1✔
131
            logger.info(f"watching {kind} in cluster {client.get_cluster().id}")
1✔
132
            tasks.append(asyncio.create_task(self.__watch_kind(kind, client)))
1✔
133

134
        return tasks
1✔
135

136
    async def start(self) -> None:
2✔
137
        """Start the watcher."""
138
        for cluster_id in sorted(self.__clusters.keys()):
1✔
139
            if (client := self.__clusters.get(cluster_id)) is not None:
1✔
140
                await self.__full_sync(client)
1✔
141
                self.__full_sync_tasks[cluster_id] = asyncio.create_task(self.__periodic_full_sync(client))
1✔
142
                self.__watch_tasks[cluster_id] = self.__run_single(client)
1✔
143

144
    async def wait(self) -> None:
2✔
145
        """Wait for all tasks.
146

147
        This is mainly used to block the main function.
148
        """
149
        all_tasks = list(self.__full_sync_tasks.values())
×
150
        for tasks in self.__watch_tasks.values():
×
151
            all_tasks.extend(tasks)
×
152
        await asyncio.gather(*all_tasks)
×
153

154
    async def stop(self, timeout: timedelta = timedelta(seconds=10)) -> None:
2✔
155
        """Stop the watcher or timeout."""
156

157
        async def stop_task(task: Task, timeout: timedelta) -> None:
1✔
158
            if task.done():
1✔
159
                return
×
160
            task.cancel()
1✔
161
            try:
1✔
162
                async with asyncio.timeout(timeout.total_seconds()):
1✔
163
                    with contextlib.suppress(CancelledError):
1✔
164
                        await task
1✔
165
            except TimeoutError:
×
166
                logger.error("timeout trying to cancel k8s watcher task")
×
167
                return
×
168

169
        for task_list in self.__watch_tasks.values():
1✔
170
            for task in task_list:
1✔
171
                await stop_task(task, timeout)
1✔
172
        for task in self.__full_sync_tasks.values():
1✔
173
            await stop_task(task, timeout)
1✔
174

175

176
async def collect_metrics(
2✔
177
    previous_obj: K8sObject | None,
178
    new_obj: APIObjectInCluster,
179
    event_type: str,
180
    user_id: str,
181
    metrics: MetricsService,
182
    rp_repo: ResourcePoolRepository,
183
) -> None:
184
    """Track product metrics."""
185
    user = APIUser(id=user_id)
1✔
186

187
    if event_type == "DELETED":
1✔
188
        # session stopping
189
        await metrics.session_stopped(user=user, metadata={"session_id": new_obj.meta.name})
×
190
        return
×
191
    previous_state = previous_obj.manifest.get("status", {}).get("state", None) if previous_obj else None
1✔
192
    match new_obj.obj.status.get("state"):
1✔
193
        case State.Running.value if previous_state is None or previous_state == State.NotReady.value:
1✔
194
            # session starting
195
            resource_class_id = int(new_obj.obj.metadata.annotations.get("renku.io/resource_class_id"))
×
196
            resource_pool = await rp_repo.get_resource_pool_from_class(k8s_watcher_admin_user, resource_class_id)
×
197
            resource_class = await rp_repo.get_resource_class(k8s_watcher_admin_user, resource_class_id)
×
198

199
            await metrics.session_started(
×
200
                user=user,
201
                metadata={
202
                    "cpu": int(resource_class.cpu * 1000),
203
                    "memory": resource_class.memory,
204
                    "gpu": resource_class.gpu,
205
                    "storage": new_obj.obj.spec.session.storage.size,
206
                    "resource_class_id": resource_class_id,
207
                    "resource_pool_id": resource_pool.id or "",
208
                    "resource_class_name": f"{resource_pool.name}.{resource_class.name}",
209
                    "session_id": new_obj.meta.name,
210
                },
211
            )
212
        case State.Running.value | State.NotReady.value if previous_state == State.Hibernated.value:
1✔
213
            # session resumed
214
            await metrics.session_resumed(user, metadata={"session_id": new_obj.meta.name})
×
215
        case State.Hibernated.value if previous_state != State.Hibernated.value:
1✔
216
            # session hibernated
217
            await metrics.session_hibernated(user=user, metadata={"session_id": new_obj.meta.name})
×
218
        case _:
1✔
219
            pass
1✔
220

221

222
def k8s_object_handler(cache: K8sDbCache, metrics: MetricsService, rp_repo: ResourcePoolRepository) -> EventHandler:
2✔
223
    """Listens and to k8s events and updates the cache."""
224

225
    async def handler(obj: APIObjectInCluster, event_type: str) -> None:
1✔
226
        existing = await cache.get(obj.meta)
1✔
227
        if obj.user_id is not None:
1✔
228
            try:
1✔
229
                await collect_metrics(existing, obj, event_type, obj.user_id, metrics, rp_repo)
1✔
230
            except Exception as e:
×
231
                logger.error("failed to track product metrics", exc_info=e)
×
232
        if event_type == "DELETED":
1✔
233
            await cache.delete(obj.meta)
×
234
            return
×
235
        k8s_object = obj.to_k8s_object()
1✔
236
        k8s_object.user_id = obj.user_id
1✔
237
        await cache.upsert(k8s_object)
1✔
238

239
    return handler
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc