• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 5388199646

27 Jun 2023 09:35AM UTC coverage: 88.917% (+0.4%) from 88.567%
5388199646

push

gihub-action

web-flow
chore: changes for Postgres and Helm (#9)

Co-authored-by: Johann-Michael Thiebaut <johann.thiebaut@gmail.com>
Co-authored-by: Alessandro Degano <40891147+aledegano@users.noreply.github.com>
Co-authored-by: Ralf Grubenmann <ralf.grubenmann@protonmail.com>

654 of 654 new or added lines in 15 files covered. (100.0%)

1428 of 1606 relevant lines covered (88.92%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.77
/src/k8s/clients.py
1
"""Different implementations of k8s clients."""
1✔
2
from copy import deepcopy
1✔
3
from multiprocessing import Lock
1✔
4
from typing import Any, Dict
1✔
5
from uuid import uuid4
1✔
6

7
from kubernetes import client, config
1✔
8
from kubernetes.config.config_exception import ConfigException
1✔
9
from kubernetes.config.incluster_config import SERVICE_CERT_FILENAME, SERVICE_TOKEN_FILENAME, InClusterConfigLoader
1✔
10

11
from k8s.client_interfaces import K8sCoreClientInterface, K8sSchedudlingClientInterface
1✔
12

13

14
class K8sCoreClient(K8sCoreClientInterface):  # pragma:nocover
1✔
15
    """Real k8s core API client that exposes the required functions."""
1✔
16

17
    def __init__(self):
1✔
18
        try:
19
            InClusterConfigLoader(
20
                token_filename=SERVICE_TOKEN_FILENAME,
21
                cert_filename=SERVICE_CERT_FILENAME,
22
            ).load_and_set()
23
        except ConfigException:
24
            config.load_config()
25
        self.client = client.CoreV1Api()
26

27
    def read_namespaced_resource_quota(self, name: Any, namespace: Any, **kwargs: Any) -> Any:
1✔
28
        """Get a resource quota."""
29
        return self.client.read_namespaced_resource_quota(name, namespace, **kwargs)
30

31
    def list_namespaced_resource_quota(self, namespace: Any, **kwargs: Any) -> Any:
1✔
32
        """List resource quotas."""
33
        return self.client.list_namespaced_resource_quota(namespace, **kwargs)
34

35
    def create_namespaced_resource_quota(self, namespace: Any, body: Any, **kwargs: Any) -> Any:
1✔
36
        """Create a resource quota."""
37
        return self.client.create_namespaced_resource_quota(namespace, body, **kwargs)
38

39
    def delete_namespaced_resource_quota(self, name: Any, namespace: Any, **kwargs: Any) -> Any:
1✔
40
        """Delete a resource quota."""
41
        return self.client.delete_namespaced_resource_quota(name, namespace, **kwargs)
42

43
    def patch_namespaced_resource_quota(self, name: Any, namespace: Any, body: Any, **kwargs: Any) -> Any:
1✔
44
        """Update a resource quota."""
45
        return self.client.delete_namespaced_resource_quota(name, namespace, body, **kwargs)
46

47

48
class K8sSchedulingClient(K8sSchedudlingClientInterface):  # pragma:nocover
1✔
49
    """Real k8s scheduling API client that exposes the required functions."""
1✔
50

51
    def __init__(self):
1✔
52
        try:
53
            InClusterConfigLoader(
54
                token_filename=SERVICE_TOKEN_FILENAME,
55
                cert_filename=SERVICE_CERT_FILENAME,
56
            ).load_and_set()
57
        except ConfigException:
58
            config.load_config()
59
        self.client = client.SchedulingV1Api()
60

61
    def create_priority_class(self, body: Any, **kwargs: Any) -> Any:
1✔
62
        """Create a priority class."""
63
        return self.client.create_priority_class(body, **kwargs)
64

65
    def delete_priority_class(self, name: Any, **kwargs: Any) -> Any:
1✔
66
        """Delete a priority class."""
67
        return self.client.delete_priority_class(name, **kwargs)
68

69

70
class DummyCoreClient(K8sCoreClientInterface):
1✔
71
    """Dummy k8s core API client that does not require a k8s cluster.
1✔
72

73
    Not suitable for production - to be used only for testing and development.
74
    """
75

76
    def __init__(self, quotas: Dict[str, client.V1ResourceQuota]):
1✔
77
        self.quotas = quotas
1✔
78
        self._lock = Lock()
1✔
79

80
    def read_namespaced_resource_quota(self, name: Any, namespace: Any, **kwargs: Any) -> Any:
1✔
81
        """Get a resource quota."""
82
        with self._lock:
1✔
83
            quota = self.quotas.get(name)
1✔
84
            if quota is None:
1✔
85
                raise client.ApiException(status=404)
×
86
            return quota
1✔
87

88
    def list_namespaced_resource_quota(self, namespace: Any, **kwargs: Any) -> Any:
1✔
89
        """List resource quotas."""
90
        with self._lock:
1✔
91
            return client.V1ResourceQuotaList(items=list(self.quotas.values()))
1✔
92

93
    def create_namespaced_resource_quota(self, namespace: Any, body: Any, **kwargs: Any) -> Any:
1✔
94
        """Create a resource quota."""
95
        with self._lock:
1✔
96
            if isinstance(body.metadata, dict):
1✔
97
                body.metadata = client.V1ObjectMeta(**body.metadata)
1✔
98
            body.metadata.uid = uuid4()
1✔
99
            body.api_version = "v1"
1✔
100
            body.kind = "ResourceQuota"
1✔
101
            self.quotas[body.metadata.name] = body
1✔
102
            return body
1✔
103

104
    def delete_namespaced_resource_quota(self, name: Any, namespace: Any, **kwargs: Any) -> Any:
1✔
105
        """Delete a resource quota."""
106
        with self._lock:
1✔
107
            removed_quota = self.quotas.pop(name, None)
1✔
108
            if removed_quota is None:
1✔
109
                raise client.ApiException(status=404)
×
110
            return removed_quota
1✔
111

112
    def patch_namespaced_resource_quota(self, name: Any, namespace: Any, body: Any, **kwargs: Any) -> Any:
1✔
113
        """Update a resource quota."""
114
        with self._lock:
1✔
115
            old_quota = self.quotas.get(name)
1✔
116
            if old_quota is None:
1✔
117
                raise client.ApiException(status=404)
×
118
            new_quota = deepcopy(old_quota)
1✔
119
            if isinstance(body, client.V1ResourceQuota):
1✔
120
                new_quota.spec = body.spec
1✔
121
            if isinstance(body, dict):
1✔
122
                new_quota.spec = client.V1ResourceQuota(**body).spec
×
123
            self.quotas[name] = new_quota
1✔
124
            return new_quota
1✔
125

126

127
class DummySchedulingClient(K8sSchedudlingClientInterface):
1✔
128
    """Dummy k8s scheduling API client that does not require a k8s cluster.
1✔
129

130
    Not suitable for production - to be used only for testing and development.
131
    """
132

133
    def __init__(self, pcs: Dict[str, client.V1PriorityClass]):
1✔
134
        self.pcs = pcs
1✔
135
        self._lock = Lock()
1✔
136

137
    def create_priority_class(self, body: Any, **kwargs: Any) -> Any:
1✔
138
        """Create a priority class."""
139
        with self._lock:
1✔
140
            if isinstance(body.metadata, dict):
1✔
141
                body.metadata = client.V1ObjectMeta(**body.metadata)
×
142
            body.metadata.uid = uuid4()
1✔
143
            body.api_version = "v1"
1✔
144
            body.kind = "PriorityClass"
1✔
145
            self.pcs[body.metadata.name] = body
1✔
146
            return body
1✔
147

148
    def delete_priority_class(self, name: Any, **kwargs: Any) -> Any:
1✔
149
        """Delete a priority class."""
150
        with self._lock:
1✔
151
            removed_pc = self.pcs.pop(name, None)
1✔
152
            if removed_pc is None:
1✔
153
                raise client.ApiException(status=404)
×
154
            return removed_pc
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc