• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 27152425961

08 Jun 2026 04:38PM UTC coverage: 82.105% (+0.06%) from 82.044%
27152425961

Pull #108

github

web-flow
Merge ed0b797b1 into 837aa7bf6
Pull Request #108: Fix test-only JSON patches treated as non-empty mutations

1081 of 1496 branches covered (72.26%)

Branch coverage included in aggregate %.

4 of 4 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

4443 of 5232 relevant lines covered (84.92%)

4.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.35
/src/main/python/kubernator/plugins/k8s_api.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import json
5✔
20
import re
5✔
21
from collections import namedtuple
5✔
22
from collections.abc import Callable, Mapping, MutableMapping, Sequence
5✔
23
from enum import Enum, auto
5✔
24
from functools import partial, wraps
5✔
25
from pathlib import Path
5✔
26
from typing import Union, Optional
5✔
27

28
import yaml
5✔
29
from jsonschema.exceptions import ValidationError
5✔
30

31
from kubernator.api import load_file, FileType, load_remote_file, calling_frame_source, parse_yaml_docs
5✔
32

33

34
def api_exc_normalize_body(e):
5✔
35
    """Parse a raw ApiException body (JSON or YAML) into a Python object in-place.
36

37
    Idempotent: if e.body is already a parsed object, it is left unchanged.
38
    """
39
    if not isinstance(e.body, (str, bytes)):
5✔
40
        return
5✔
41
    if e.headers and "content-type" in e.headers:
5!
42
        content_type = e.headers["content-type"]
5✔
43
        if content_type == "application/json" or content_type.endswith("+json"):
5!
44
            e.body = json.loads(e.body)
5✔
45
        elif (content_type in ("application/yaml", "application/x-yaml", "text/yaml",
×
46
                               "text/x-yaml") or content_type.endswith("+yaml")):
47
            e.body = yaml.safe_load(e.body)
×
48

49

50
def api_exc_format_body(e):
5✔
51
    """Format an ApiException body back to a string for human-readable display.
52

53
    Idempotent: if e.body is already a string/bytes, it is left unchanged.
54
    """
55
    if not isinstance(e.body, (str, bytes)):
5!
56
        e.body = json.dumps(e.body, indent=4)
5✔
57

58

59
def _normalize_api_exc(func):
5✔
60
    """Decorator: normalize ApiException body before propagating."""
61
    @wraps(func)
5✔
62
    def wrapper(*args, **kwargs):
5✔
63
        from kubernetes.client import ApiException
5✔
64
        try:
5✔
65
            return func(*args, **kwargs)
5✔
66
        except ApiException as e:
5✔
67
            api_exc_normalize_body(e)
5✔
68
            raise
5✔
69
    return wrapper
5✔
70

71

72
K8S_WARNING_HEADER = re.compile(r'(?:,\s*)?(\d{3})\s+(\S+)\s+"(.+?)(?<!\\)"(?:\s+\"(.+?)(?<!\\)\")?\s*')
5✔
73
UPPER_FOLLOWED_BY_LOWER_RE = re.compile(r"(.)([A-Z][a-z]+)")
5✔
74
LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE = re.compile(r"([a-z0-9])([A-Z])")
5✔
75

76
CLUSTER_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}([^/]+)$")
5✔
77
NAMESPACED_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}namespaces/[^/]+/([^/]+)$")
5✔
78

79
PROJECT_ANNOTATION = "kubernator.io/project"
5✔
80

81

82
class K8SResourcePatchType(Enum):
5✔
83
    JSON_PATCH = auto()
5✔
84
    SERVER_SIDE_PATCH = auto()
5✔
85

86

87
class K8SPropagationPolicy(Enum):
5✔
88
    BACKGROUND = ("Background",)
5✔
89
    FOREGROUND = ("Foreground",)
5✔
90
    ORPHAN = ("Orphan",)
5✔
91

92
    def __init__(self, policy):
5✔
93
        self.policy = policy
5✔
94

95

96
def to_group_and_version(api_version):
5✔
97
    group, _, version = api_version.partition("/")
5✔
98
    if not version:
5✔
99
        version = group
5✔
100
        group = ""
5✔
101
    return group, version
5✔
102

103

104
def to_k8s_resource_def_key(manifest):
5✔
105
    return K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]),
×
106
                             manifest["kind"])
107

108

109
class K8SResourceDefKey(namedtuple("K8SResourceDefKey", ["group", "version", "kind"])):
5✔
110
    __slots__ = ()
5✔
111

112
    def __str__(self):
5✔
113
        return f"{self.group}{'/' if self.group else '/'}{self.version}/{self.kind}"
5✔
114

115

116
class K8SResourceDef:
5✔
117
    def __init__(self, key, singular, plural, namespaced, custom, schema):
5✔
118
        self.key = key
5✔
119
        self.singular = singular
5✔
120
        self.plural = plural
5✔
121
        self.namespaced = namespaced
5✔
122
        self.custom = custom
5✔
123
        self.schema = schema
5✔
124

125
        self._api_get = None
5✔
126
        self._api_create = None
5✔
127
        self._api_patch = None
5✔
128
        self._api_delete = None
5✔
129
        self._api_list = None
5✔
130

131
    @property
5✔
132
    def group(self) -> str:
5✔
133
        return self.key.group
5✔
134

135
    @property
5✔
136
    def version(self) -> str:
5✔
137
        return self.key.version
5✔
138

139
    @property
5✔
140
    def kind(self) -> str:
5✔
141
        return self.key.kind
5✔
142

143
    @property
5✔
144
    def has_api(self) -> bool:
5✔
145
        return self.custom or self.plural
5✔
146

147
    @property
5✔
148
    def get(self):
5✔
149
        return self._api_get
5✔
150

151
    @property
5✔
152
    def create(self):
5✔
153
        return self._api_create
5✔
154

155
    @property
5✔
156
    def patch(self):
5✔
157
        return self._api_patch
5✔
158

159
    @property
5✔
160
    def delete(self):
5✔
161
        return self._api_delete
5✔
162

163
    @property
5✔
164
    def list(self):
5✔
165
        return self._api_list
5✔
166

167
    def __eq__(self, o: object) -> bool:
5✔
168
        if not isinstance(o, K8SResourceDef):
×
169
            return False
×
170

171
        return (self.key == o.key and
×
172
                self.singular == o.singular and
173
                self.plural == o.plural and
174
                self.namespaced == o.namespaced and
175
                self.custom == o.custom)
176

177
    def __hash__(self) -> int:
5✔
178
        return self.key.__hash__()
×
179

180
    def __str__(self):
5✔
181
        return f"{self.key=}, {self.singular=}, {self.plural=}, {self.namespaced=}, {self.custom=}"
×
182

183
    @classmethod
5✔
184
    def from_manifest(cls, key: K8SResourceDefKey,
5✔
185
                      schema,
186
                      paths: Mapping[K8SResourceDefKey, Mapping[str, Mapping]]):
187
        singular = key.kind.lower()
5✔
188

189
        plural = None
5✔
190
        namespaced = False
5✔
191

192
        if singular == "namespace":
5✔
193
            plural = "namespaces"
5✔
194
        else:
195
            for path in paths.get(key, ()):
5✔
196
                if m := NAMESPACED_RESOURCE_PATH.fullmatch(path):
5✔
197
                    plural = m[1]
5✔
198
                    namespaced = True
5✔
199
                    break
5✔
200
                elif m := CLUSTER_RESOURCE_PATH.fullmatch(path):
5✔
201
                    plural = m[1]
5✔
202

203
        yield K8SResourceDef(key, singular, plural, namespaced, False, schema)
5✔
204

205
    @classmethod
5✔
206
    def from_resource(cls, resource: "K8SResource"):
5✔
207
        manifest = resource.manifest
5✔
208
        spec = manifest["spec"]
5✔
209
        group = spec["group"]
5✔
210
        names = spec["names"]
5✔
211
        kind = names["kind"]
5✔
212
        singular = names.get("singular", names["kind"].lower())
5✔
213
        plural = names["plural"]
5✔
214
        namespaced = spec["scope"] == "Namespaced"
5✔
215

216
        for version_spec in spec["versions"]:
5✔
217
            version = version_spec["name"]
5✔
218
            if resource.version == "v1":
5!
219
                schema = version_spec["schema"]["openAPIV3Schema"]
5✔
220
            else:
221
                schema = spec["validation"]["openAPIV3Schema"]
×
222
            yield K8SResourceDef(K8SResourceDefKey(group, version, kind), singular, plural, namespaced, True, schema)
5✔
223

224
    def populate_api(self, k8s_client_module, k8s_client):
5✔
225
        if not self.has_api:
5!
226
            raise RuntimeError(f"{self} has no API")
×
227

228
        if self._api_get:
5✔
229
            return
5✔
230

231
        group = self.group or "core"
5✔
232
        version = self.version
5✔
233
        kind = self.kind
5✔
234

235
        if self.custom:
5✔
236
            k8s_api = k8s_client_module.CustomObjectsApi(k8s_client)
5✔
237

238
            kwargs = {"group": group,
5✔
239
                      "version": version,
240
                      "plural": self.plural}
241
            if self.namespaced:
5!
242
                self._api_get = partial(k8s_api.get_namespaced_custom_object, **kwargs)
5✔
243
                self._api_patch = partial(k8s_api.patch_namespaced_custom_object, **kwargs)
5✔
244
                self._api_create = partial(k8s_api.create_namespaced_custom_object, **kwargs)
5✔
245
                self._api_delete = partial(k8s_api.delete_namespaced_custom_object, **kwargs)
5✔
246
                self._api_list = partial(k8s_api.list_namespaced_custom_object, **kwargs)
5✔
247
            else:
248
                self._api_get = partial(k8s_api.get_cluster_custom_object, **kwargs)
×
249
                self._api_patch = partial(k8s_api.patch_cluster_custom_object, **kwargs)
×
250
                self._api_create = partial(k8s_api.create_cluster_custom_object, **kwargs)
×
251
                self._api_delete = partial(k8s_api.delete_cluster_custom_object, **kwargs)
×
252
                self._api_list = partial(k8s_api.list_cluster_custom_object, **kwargs)
×
253
        else:
254
            # Take care for the case e.g. api_type is "apiextensions.k8s.io"
255
            # Only replace the last instance
256
            group = "".join(group.rsplit(".k8s.io", 1))
5✔
257

258
            # convert group name from DNS subdomain format to
259
            # python class name convention
260
            group = "".join(word.capitalize() for word in group.split('.'))
5✔
261
            fcn_to_call = f"{group}{version.capitalize()}Api"
5✔
262
            k8s_api = getattr(k8s_client_module, fcn_to_call)(k8s_client)
5✔
263

264
            # Replace CamelCased action_type into snake_case
265
            kind = UPPER_FOLLOWED_BY_LOWER_RE.sub(r"\1_\2", kind)
5✔
266
            kind = LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE.sub(r"\1_\2", kind).lower()
5✔
267

268
            if self.namespaced:
5✔
269
                self._api_get = getattr(k8s_api, f"read_namespaced_{kind}")
5✔
270
                self._api_patch = getattr(k8s_api, f"patch_namespaced_{kind}")
5✔
271
                self._api_create = getattr(k8s_api, f"create_namespaced_{kind}")
5✔
272
                self._api_delete = getattr(k8s_api, f"delete_namespaced_{kind}")
5✔
273
                self._api_list = getattr(k8s_api, f"list_namespaced_{kind}")
5✔
274
            else:
275
                self._api_get = getattr(k8s_api, f"read_{kind}")
5✔
276
                self._api_patch = getattr(k8s_api, f"patch_{kind}")
5✔
277
                self._api_create = getattr(k8s_api, f"create_{kind}")
5✔
278
                self._api_delete = getattr(k8s_api, f"delete_{kind}")
5✔
279
                self._api_list = getattr(k8s_api, f"list_{kind}")
5✔
280

281

282
class K8SResourceKey(namedtuple("K8SResourceKey", ["group", "kind", "name", "namespace"])):
5✔
283
    __slots__ = ()
5✔
284

285
    def __str__(self):
5✔
286
        return (f"{self.group}{'/' if self.group else 'v1/'}{self.kind}"
×
287
                f"/{self.name}{f'.{self.namespace}' if self.namespace else ''}")
288

289

290
class K8SResource:
5✔
291
    _k8s_client_version = None
5✔
292
    _k8s_field_validation = None
5✔
293
    _k8s_field_validation_patched = None
5✔
294
    _logger = None
5✔
295
    _api_warnings = None
5✔
296

297
    def __init__(self, manifest: dict, rdef: K8SResourceDef, source: Union[str, Path] = None):
5✔
298
        self.key = self.get_manifest_key(manifest)
5✔
299

300
        self.manifest = manifest
5✔
301
        self.rdef = rdef
5✔
302
        self.source = source
5✔
303

304
    @property
5✔
305
    def group(self) -> str:
5✔
306
        return self.key.group
5✔
307

308
    @property
5✔
309
    def version(self) -> str:
5✔
310
        return self.rdef.version
5✔
311

312
    @property
5✔
313
    def kind(self) -> str:
5✔
314
        return self.key.kind
5✔
315

316
    @property
5✔
317
    def name(self) -> str:
5✔
318
        return self.key.name
5✔
319

320
    @name.setter
5✔
321
    def name(self, value):
5✔
322
        self.manifest["metadata"]["name"] = value
×
323
        self.key = self.get_manifest_key(self.manifest)
×
324

325
    @property
5✔
326
    def namespace(self) -> Optional[str]:
5✔
327
        return self.key.namespace
5✔
328

329
    @namespace.setter
5✔
330
    def namespace(self, value):
5✔
331
        self.manifest["metadata"]["namespace"] = value
×
332
        self.key = self.get_manifest_key(self.manifest)
×
333

334
    @property
5✔
335
    def api_version(self) -> str:
5✔
336
        return self.manifest["apiVersion"]
5✔
337

338
    @property
5✔
339
    def schema(self) -> dict:
5✔
340
        return self.rdef.schema
×
341

342
    @property
5✔
343
    def is_crd(self):
5✔
344
        return self.group == "apiextensions.k8s.io" and self.kind == "CustomResourceDefinition"
5✔
345

346
    @property
5✔
347
    def project(self) -> Optional[str]:
5✔
348
        metadata = self.manifest.get("metadata") or {}
5✔
349
        annotations = metadata.get("annotations") or {}
5✔
350
        return annotations.get(PROJECT_ANNOTATION)
5✔
351

352
    def __str__(self):
5✔
353
        return f"{self.api_version}/{self.kind}/{self.name}{'.' + self.namespace if self.namespace else ''}"
5✔
354

355
    @_normalize_api_exc
5✔
356
    def get(self):
5✔
357
        rdef = self.rdef
5✔
358
        kwargs = {"name": self.name,
5✔
359
                  "_preload_content": False}
360
        if rdef.namespaced:
5✔
361
            kwargs["namespace"] = self.namespace
5✔
362
        return json.loads(self.rdef.get(**kwargs).data)
5✔
363

364
    @_normalize_api_exc
5✔
365
    def create(self, dry_run=True):
5✔
366
        rdef = self.rdef
5✔
367
        kwargs = {"body": self.manifest,
5✔
368
                  "_preload_content": False,
369
                  "field_manager": "kubernator",
370
                  }
371

372
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
373
        if self._k8s_field_validation_patched or not self.rdef.custom:
5!
374
            kwargs["field_validation"] = self._k8s_field_validation
5✔
375
        if rdef.namespaced:
5✔
376
            kwargs["namespace"] = self.namespace
5✔
377
        if dry_run:
5✔
378
            kwargs["dry_run"] = "All"
5✔
379
        resp = rdef.create(**kwargs)
5✔
380
        self._process_response_headers(resp)
5✔
381
        return json.loads(resp.data)
5✔
382

383
    @_normalize_api_exc
5✔
384
    def patch(self, json_patch, *, patch_type: K8SResourcePatchType, force=False, dry_run=True):
5✔
385
        rdef = self.rdef
5✔
386
        kwargs = {"name": self.name,
5✔
387
                  "body": json_patch,
388
                  "_preload_content": False,
389
                  "field_manager": "kubernator",
390
                  }
391

392
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
393
        if self._k8s_field_validation_patched or not self.rdef.custom:
5!
394
            kwargs["field_validation"] = self._k8s_field_validation
5✔
395
        if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
5✔
396
            kwargs["force"] = force
5✔
397
        if rdef.namespaced:
5✔
398
            kwargs["namespace"] = self.namespace
5✔
399
        if dry_run:
5✔
400
            kwargs["dry_run"] = "All"
5✔
401

402
        def select_header_content_type_patch(content_types, method=None, body=None):
5✔
403
            if patch_type == K8SResourcePatchType.JSON_PATCH:
5✔
404
                return "application/json-patch+json"
5✔
405
            if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
5!
406
                return "application/apply-patch+yaml"
5✔
407
            raise NotImplementedError
×
408

409
        if isinstance(rdef.patch, partial):
5!
410
            api_client = rdef.patch.func.__self__.api_client
×
411
        else:
412
            api_client = rdef.patch.__self__.api_client
5✔
413

414
        old_func = api_client.select_header_content_type
5✔
415
        try:
5✔
416
            api_client.select_header_content_type = select_header_content_type_patch
5✔
417
            resp = rdef.patch(**kwargs)
5✔
418
            self._process_response_headers(resp)
5✔
419
            return json.loads(resp.data)
5✔
420
        finally:
421
            api_client.select_header_content_type = old_func
5✔
422

423
    @_normalize_api_exc
5✔
424
    def delete(self, *, dry_run=True, propagation_policy=K8SPropagationPolicy.BACKGROUND, wait=True):
5✔
425
        from kubernetes.client import ApiException
5✔
426
        rdef = self.rdef
5✔
427
        kwargs = {"name": self.name,
5✔
428
                  "_preload_content": False,
429
                  "propagation_policy": propagation_policy.policy
430
                  }
431
        if rdef.namespaced:
5!
432
            kwargs["namespace"] = self.namespace
5✔
433
        if dry_run:
5✔
434
            kwargs["dry_run"] = "All"
5✔
435

436
        result = json.loads(rdef.delete(**kwargs).data)
5✔
437

438
        if wait and not dry_run:
5✔
439
            # Wait for the resource to actually disappear by watching for the
440
            # DELETED event. If the resource is already gone before the watch
441
            # opens (race), the watch produces no events; loop with a short
442
            # server-side timeout and re-check existence via get() to break out.
443
            while True:
5✔
444
                for event in self.watch(timeout_seconds=10):
5✔
UNCOV
445
                    if event["type"] == "DELETED":
4✔
UNCOV
446
                        return result
4✔
447
                try:
1✔
448
                    self.get()
1✔
449
                except ApiException as e:
1✔
450
                    if e.status == 404:
1!
451
                        return result
1✔
452
                    raise
×
453

454
        return result
5✔
455

456
    def watch(self, *, timeout_seconds=None):
5✔
457
        from kubernetes import watch as k8s_watch
5✔
458
        rdef = self.rdef
5✔
459
        kwargs = {"field_selector": f"metadata.name={self.name}"}
5✔
460
        if rdef.namespaced:
5!
461
            kwargs["namespace"] = self.namespace
5✔
462
        if timeout_seconds is not None:
5!
463
            kwargs["timeout_seconds"] = timeout_seconds
5✔
464
        return k8s_watch.Watch().stream(rdef.list, **kwargs)
5✔
465

466
    @staticmethod
5✔
467
    def get_manifest_key(manifest):
5✔
468
        return K8SResourceKey(to_group_and_version(manifest["apiVersion"])[0],
5✔
469
                              manifest["kind"],
470
                              manifest["metadata"]["name"],
471
                              manifest["metadata"].get("namespace"))
472

473
    @staticmethod
5✔
474
    def get_manifest_description(manifest: dict, source=None):
5✔
475
        api_version = manifest.get("apiVersion")
5✔
476
        kind = manifest.get("kind")
5✔
477
        metadata = manifest.get("metadata")
5✔
478
        name = None
5✔
479
        namespace = None
5✔
480
        if metadata:
5!
481
            name = metadata.get("name")
5✔
482
            namespace = metadata.get("namespace")
5✔
483
        return (f"{api_version or 'unknown'}/{kind or '<unknown>'}/"
5✔
484
                f"{name or '<unknown>'}{'.' + namespace if namespace else ''}")
485

486
    def __eq__(self, other):
5✔
487
        if not isinstance(other, K8SResource):
×
488
            return False
×
489
        return self.key == other.key and self.manifest == other.manifest
×
490

491
    def _process_response_headers(self, resp):
5✔
492
        headers = resp.headers
5✔
493
        warn_headers = headers.get("Warning")
5✔
494
        if warn_headers:
5✔
495
            for warn in K8S_WARNING_HEADER.findall(warn_headers):
5✔
496
                code, _, msg, _ = warn
5✔
497
                code = int(code)
5✔
498
                msg = msg.encode("utf-8").decode("unicode_escape")
5✔
499
                if code == 299:
5!
500
                    self._api_warnings(self, msg)
5✔
501
                else:
502
                    self._logger.warning("Unknown API warning received for resource %s from %s: code %d: %s",
×
503
                                         self, self.source, code, msg)
504

505

506
class K8SResourcePluginMixin:
5✔
507
    def __init__(self):
5✔
508
        self.validator = None
5✔
509
        self.resources: MutableMapping[K8SResourceKey, K8SResource] = {}
5✔
510

511
    def _require_validator(self):
5✔
512
        if self.validator is None:
5!
513
            raise RuntimeError(
×
514
                "K8S validator not initialised; handle_start() must run first")
515
        return self.validator
5✔
516

517
    @property
5✔
518
    def resource_definitions(self) -> MutableMapping[K8SResourceDefKey, K8SResourceDef]:
5✔
519
        return self._require_validator().resource_definitions
5✔
520

521
    @property
5✔
522
    def resource_paths(self) -> MutableMapping[K8SResourceDefKey, MutableMapping[str, dict]]:
5✔
523
        return self._require_validator().resource_paths
×
524

525
    def add_resources(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
5✔
526
        if not source:
5✔
527
            source = calling_frame_source()
5✔
528

529
        if isinstance(manifests, str):
5✔
530
            manifests = list(parse_yaml_docs(manifests, source))
5✔
531

532
        if isinstance(manifests, (Mapping, dict)):
5!
533
            return self.add_resource(manifests, source)
×
534
        else:
535
            return [self.add_resource(m, source) for m in manifests if m]
5✔
536

537
    def add_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
538
        if not source:
5!
539
            source = calling_frame_source()
×
540
        resource = self._create_resource(manifest, source)
5✔
541

542
        try:
5✔
543
            trans_resource = self._transform_resource(list(self.resources.values()), resource)
5✔
544
        except Exception as e:
×
545
            self.logger.error("An error occurred running transformers on %s", resource, exc_info=e)
×
546
            raise
×
547

548
        errors = list(self._validate_resource(trans_resource.manifest, source))
5✔
549
        if errors:
5!
550
            for error in errors:
×
551
                if source:
×
552
                    self.logger.error("Error detected in re-transformed K8S resource %s generated through %s",
×
553
                                      trans_resource, source, exc_info=error)
554
            raise errors[0]
×
555

556
        return self._add_resource(trans_resource, source)
5✔
557

558
    def add_crds(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
5✔
559
        if not source:
×
560
            source = calling_frame_source()
×
561

562
        if isinstance(manifests, str):
×
563
            manifests = list(parse_yaml_docs(manifests, source))
×
564

565
        if isinstance(manifests, (Mapping, dict)):
×
566
            return self.add_crd(manifests, source)
×
567
        else:
568
            return [self.add_crd(m, source) for m in manifests if m]
×
569

570
    def add_crd(self, manifest: dict, source: Union[str, Path] = None):
5✔
571
        if not source:
5!
572
            source = calling_frame_source()
×
573
        resource = self._create_resource(manifest, source)
5✔
574
        if not resource.is_crd:
5!
575
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
576
            raise ValueError(f"K8S manifest {resource_description} from {source} is not a CRD")
×
577

578
        self._add_crd(resource)
5✔
579
        return resource
5✔
580

581
    def create_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
582
        """Create K8S resource without adding it"""
583
        if not source:
×
584
            source = calling_frame_source()
×
585

586
        return self._create_resource(manifest, source)
×
587

588
    def add_local_resources(self, path: Path, file_type: FileType, source: str = None):
5✔
589
        manifests = load_file(self.logger, path, file_type)
×
590

591
        return [self.add_resource(m, source or path) for m in manifests if m]
×
592

593
    def add_remote_resources(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
5✔
594
                             source: str = None):
595
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
×
596

597
        return [self.add_resource(m, source or url) for m in manifests if m]
×
598

599
    def add_local_crds(self, path: Path, file_type: FileType, source: str = None):
5✔
600
        manifests = load_file(self.logger, path, file_type)
5✔
601

602
        return [self.add_crd(m, source or path) for m in manifests if m]
5✔
603

604
    def add_remote_crds(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
5✔
605
                        source: str = None):
606
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
5✔
607

608
        return [self.add_crd(m, source or url) for m in manifests if m]
5✔
609

610
    def get_api_versions(self):
5✔
611
        return self.validator.api_versions()
5✔
612

613
    def _create_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
614
        resource_description = K8SResource.get_manifest_description(manifest, source)
5✔
615

616
        new_manifest = self._patch_manifest(manifest, resource_description)
5✔
617
        if new_manifest != manifest:
5!
618
            manifest = new_manifest
×
619
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
620

621
        self.logger.debug("Validating K8S manifest for %s", resource_description)
5✔
622
        errors = list(self._validate_resource(manifest, source))
5✔
623
        if errors:
5✔
624
            for error in errors:
5✔
625
                self.logger.error("Error detected in K8S manifest %s from %s: \n%s",
5✔
626
                                  resource_description, source or "<unknown>", yaml.safe_dump(manifest, None),
627
                                  exc_info=error)
628
            raise errors[0]
5✔
629

630
        rdef = self._get_manifest_rdef(manifest)
5✔
631
        return K8SResource(manifest, rdef, source)
5✔
632

633
    def _add_resource(self, resource: K8SResource, source):
5✔
634
        if resource.key in self.resources:
5!
635
            existing_resource = self.resources[resource.key]
×
636
            if resource != existing_resource:
×
637
                raise ValidationError("resource %s from %s already exists and was added from %s" %
×
638
                                      (resource.key, resource.source, existing_resource.source))
639
            self.logger.trace("K8S resource for %s from %s is already present and is identical", resource, source)
×
640
            return existing_resource
×
641

642
        self.logger.info("Adding K8S resource for %s from %s", resource, source)
5✔
643
        self.resources[resource.key] = resource
5✔
644

645
        if resource.is_crd:
5✔
646
            self._add_crd(resource)
5✔
647

648
        return resource
5✔
649

650
    def _patch_manifest(self,
5✔
651
                        manifest: dict,
652
                        resource_description: str):
653
        return manifest
5✔
654

655
    def _transform_resource(self,
5✔
656
                            resources: Sequence[K8SResource],
657
                            resource: K8SResource) -> K8SResource:
658
        return resource
5✔
659

660
    def _filter_resources(self, func: Callable[[K8SResource], bool]):
5✔
661
        yield from filter(func, self.resources.values())
×
662

663
    def _validate_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
664
        yield from self._require_validator().iter_manifest_errors(manifest)
5✔
665

666
    def _get_manifest_rdef(self, manifest):
5✔
667
        return self._require_validator().get_manifest_rdef(manifest)
5✔
668

669
    def _add_crd(self, resource: K8SResource):
5✔
670
        for crd in K8SResourceDef.from_resource(resource):
5✔
671
            self.logger.info("Adding K8S CRD definition %s", crd.key)
5✔
672
            self.resource_definitions[crd.key] = crd
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc