• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 23204418307

17 Mar 2026 04:17PM UTC coverage: 87.145% (-0.06%) from 87.205%
23204418307

push

github

web-flow
fix: handle missing jwk (#1243)

1 of 2 new or added lines in 1 file covered. (50.0%)

23 existing lines in 9 files now uncovered.

24995 of 28682 relevant lines covered (87.15%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.33
/components/renku_data_services/resource_usage/core.py
1
"""Core functions for resource usage."""
2

3
from collections.abc import AsyncIterator
2✔
4
from datetime import UTC, date, datetime, timedelta
2✔
5
from typing import Protocol
2✔
6

7
from renku_data_services import errors
2✔
8
from renku_data_services.app_config import logging
2✔
9
from renku_data_services.k8s.client_interfaces import K8sClient
2✔
10
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER, ClusterId
2✔
11
from renku_data_services.k8s.models import GVK, K8sObject, K8sObjectFilter, K8sObjectMeta
2✔
12
from renku_data_services.resource_usage import apispec
2✔
13
from renku_data_services.resource_usage.db import ResourceRequestsRepo
2✔
14
from renku_data_services.resource_usage.model import (
2✔
15
    Credit,
16
    ResourceClassCost,
17
    ResourceDataFacade,
18
    ResourcePoolLimits,
19
    ResourcePoolUsage,
20
    ResourcesRequest,
21
    ResourceUsageQuery,
22
    ResourceUsageSummary,
23
)
24

25
logger = logging.getLogger(__file__)
2✔
26

27

28
def validate_resource_pool_limit_put(id: int, body: apispec.ResourcePoolLimitPut) -> ResourcePoolLimits:
2✔
29
    """Validate resource pool limit."""
30
    if body.user_limit > body.total_limit:
1✔
31
        raise errors.ValidationError(
×
32
            message=f"The user_limit '{body.user_limit}' must be lower than total_limit '{body.total_limit}'.",
33
        )
34
    return ResourcePoolLimits(id, Credit.from_int(body.total_limit), Credit.from_int(body.user_limit))
1✔
35

36

37
def validate_resource_class_costs_put(class_id: int, body: apispec.ResourceClassCostPut) -> ResourceClassCost:
2✔
38
    """Validate the resource class cost data."""
39
    return ResourceClassCost(resource_class_id=class_id, cost=Credit.from_int(body.cost))
1✔
40

41

42
class ResourceRequestsFetchProto(Protocol):
2✔
43
    """Protocol defining methods for getting resource requests."""
44

45
    def get_resources_requests_iter(self, capture_interval: timedelta) -> AsyncIterator[ResourcesRequest]:
2✔
46
        """Iterating through resource requests."""
47
        ...
×
48

49

50
class ResourceRequestsFetch(ResourceRequestsFetchProto):
2✔
51
    """Get resource request data."""
52

53
    def __init__(self, k8s_client: K8sClient) -> None:
2✔
54
        self._client = k8s_client
×
55

56
    async def _get_node(
2✔
57
        self, node_name: str | None, pod: K8sObject, node_cache: dict[str, ResourceDataFacade]
58
    ) -> ResourceDataFacade | None:
59
        if node_name:
×
60
            node_obj = node_cache.get(node_name)
×
61
            if node_obj is not None:
×
62
                return node_obj
×
63
            else:
64
                pod_node = await self._client.get(
×
65
                    K8sObjectMeta(
66
                        name=node_name, namespace=pod.namespace, cluster=pod.cluster, gvk=GVK(kind="node", version="v1")
67
                    )
68
                )
69
                if pod_node:
×
70
                    node_obj = ResourceDataFacade(pod_node)
×
71
                    node_cache[node_name] = node_obj
×
72
                    return node_obj
×
73
        return None
×
74

75
    async def get_resources_requests_iter(self, capture_interval: timedelta) -> AsyncIterator[ResourcesRequest]:
2✔
76
        """Iterating through resource requests."""
77

78
        logger.debug("Get pods and pvc from all clusters")
×
79

80
        date = datetime.now(UTC).replace(microsecond=0)
×
81
        pod_filter = K8sObjectFilter(
×
82
            gvk=GVK(kind="Pod", version="v1"), label_selector={"app.kubernetes.io/name": "AmaltheaSession"}
83
        )
84
        pvc_filter = K8sObjectFilter(gvk=GVK(kind="PersistentVolumeClaim", version="v1"))
×
85
        node_cache: dict[str, ResourceDataFacade] = {}
×
86
        async for pod in self._client.list(pod_filter):
×
87
            obj = ResourceDataFacade(obj=pod)
×
88
            node_obj: ResourceDataFacade | None = None
×
89
            try:
×
90
                node_obj = await self._get_node(obj.node_name, pod, node_cache)
×
91
            except Exception as ex:
×
92
                logger.debug(f"Cannot get node (ignoring): {ex}", exc_info=ex)
×
93
                node_obj = None
×
94

95
            rreq = ResourcesRequest.from_pod_and_node(obj, node_obj, pod.cluster, date, capture_interval)
×
96
            await self._amend_session_fallback(pod.cluster, obj, rreq)
×
97
            yield rreq
×
98

99
        async for pvc in self._client.list(pvc_filter):
×
100
            obj = ResourceDataFacade(obj=pvc)
×
101
            rreq = ResourcesRequest.from_pvc(obj, pvc.cluster, date, capture_interval)
×
102
            await self._amend_session_fallback(pvc.cluster, obj, rreq)
×
103
            yield rreq
×
104

105
    async def _amend_session_fallback(
2✔
106
        self, cluster_id: ClusterId | None, obj: ResourceDataFacade, rreq: ResourcesRequest
107
    ) -> None:
108
        """Modifies the argument with data retrieved from the amalthea session if applicable."""
109

110
        if not rreq.user_id and obj.session_instance_id:
×
111
            ams = await self._client.get(
×
112
                K8sObjectMeta(
113
                    name=obj.session_instance_id,
114
                    namespace=obj.namespace,
115
                    cluster=cluster_id or DEFAULT_K8S_CLUSTER,
116
                    gvk=GVK(kind="AmaltheaSession", version="v1"),
117
                )
118
            )
119
            if ams:
×
120
                amsObj = ResourceDataFacade(ams)
×
121
                rreq.user_id = rreq.user_id or amsObj.user_id
×
122
                rreq.project_id = rreq.project_id or amsObj.project_id
×
123
                rreq.launcher_id = rreq.launcher_id or amsObj.launcher_id
×
124
                if not rreq.resource_class_id:
×
125
                    rreq.resource_class_id = amsObj.resource_class_id
×
126
                if not rreq.resource_pool_id:
×
127
                    rreq.resource_pool_id = amsObj.resource_pool_id
×
128

129

130
class ResourcesRequestRecorder(Protocol):
2✔
131
    """Methods for recording resource requests."""
132

133
    async def record_resource_requests(self, interval: timedelta) -> None:
2✔
134
        """Fetches all resource requests in the given namespace and stores them."""
135
        ...
×
136

137

138
class NoopResourcesRequestRecorder(ResourcesRequestRecorder):
2✔
139
    """No-op resource request recorder."""
140

141
    async def record_resource_requests(self, interval: timedelta) -> None:
2✔
142
        """Fetches all resource requests in the given namespace and stores them."""
143
        return None
×
144

145

146
class DefaultResourcesRequestRecorder(ResourcesRequestRecorder):
2✔
147
    """Methods for recording resource requests."""
148

149
    def __init__(self, repo: ResourceRequestsRepo, fetch: ResourceRequestsFetchProto) -> None:
2✔
150
        self._repo = repo
1✔
151
        self._fetch = fetch
1✔
152

153
    async def record_resource_requests(self, interval: timedelta) -> None:
2✔
154
        """Fetches all resource requests in the given namespace and stores them."""
155
        result: list[ResourcesRequest] = []  # await self._fetch.get_resources_requests(interval)
1✔
156
        async for item in self._fetch.get_resources_requests_iter(interval):
1✔
157
            result.append(item)
1✔
158
        size = len(result)
1✔
159
        if size == 0:
1✔
160
            logger.warning("No pod or pvc was found!")
1✔
161
        else:
162
            logger.info(f"Inserting {size} resource request records.")
1✔
163
        await self._repo.insert_many(result)
1✔
164

165

166
class ResourceUsageService:
2✔
167
    """Queries for resource usages."""
168

169
    def __init__(self, repo: ResourceRequestsRepo) -> None:
2✔
170
        self._repo = repo
2✔
171

172
    async def usage_of_running_week(
2✔
173
        self, resource_pool_id: int, user_id: str | None, current_time: datetime | None = None
174
    ) -> ResourceUsageSummary:
175
        """Return the resource usage for the given pool of the currently running week.
176

177
        The week start is Monday 0:00 UTC. Resource usage is returned in 'credits'. When a user_id
178
        is given, the results represent the usage of only that user. Otherwise the overall pool usage
179
        is returned. The running week is calculated from the `current_time` argument, which is the current
180
        time if not specified.
181
        """
182
        now = current_time.replace(tzinfo=UTC) if current_time is not None else datetime.now(UTC)
1✔
183
        start = (now - timedelta(days=now.weekday())).date()
1✔
184
        query = ResourceUsageQuery(since=start, until=now.date(), user_id=user_id, resource_pool_id=resource_pool_id)
1✔
185
        result = ResourceUsageSummary.empty()
1✔
186
        async for item in self._repo.find_usage(query):
1✔
187
            result = result.add(item)
1✔
188

189
        return result
1✔
190

191
    async def usage_of_timespan(
2✔
192
        self, resource_pool_id: int, user_id: str | None, start_date: date, end_date: date | None
193
    ) -> ResourceUsageSummary:
194
        """Return the resource usage for the given pool of the given timespan."""
195
        until = end_date or datetime.now(UTC).date()
×
196
        query = ResourceUsageQuery(since=start_date, until=until, user_id=user_id, resource_pool_id=resource_pool_id)
×
197
        result = ResourceUsageSummary.empty()
×
198
        async for item in self._repo.find_usage(query):
×
199
            result = result.add(item)
×
200

201
        return result
×
202

203
    async def get_running_week(
2✔
204
        self, resource_pool_id: int, user_id: str, current_time: datetime | None = None
205
    ) -> ResourcePoolUsage | None:
206
        """Get resource pool usage and its limits."""
207

208
        limits = await self._repo.find_resource_pool_limits(resource_pool_id)
1✔
209

210
        if limits:
1✔
211
            user_usage = await self.usage_of_running_week(resource_pool_id, user_id, current_time)
×
212
            total_usage = await self.usage_of_running_week(resource_pool_id, None, current_time)
×
213
            return ResourcePoolUsage(total_usage, user_usage, limits)
×
214
        else:
215
            return None
1✔
216

217
    async def get_for_date(
2✔
218
        self, resource_pool_id: int, user_id: str, start_date: date, end_date: date | None
219
    ) -> ResourcePoolUsage | None:
220
        """Get resource pool usage given a time span."""
221

UNCOV
222
        limits = await self._repo.find_resource_pool_limits(resource_pool_id)
×
UNCOV
223
        if limits:
×
224
            user_usage = await self.usage_of_timespan(resource_pool_id, user_id, start_date, end_date)
×
225
            total_usage = await self.usage_of_timespan(resource_pool_id, None, start_date, end_date)
×
226
            return ResourcePoolUsage(total_usage, user_usage, limits)
×
227
        else:
UNCOV
228
            return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc