• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 18890579593

28 Oct 2025 10:07PM UTC coverage: 75.024% (-0.2%) from 75.193%
18890579593

push

github

web-flow
Merge pull request #87 from karellen/files_and_parsing

New FileTypes, template functions, manifest patcher

635 of 1004 branches covered (63.25%)

Branch coverage included in aggregate %.

42 of 54 new or added lines in 4 files covered. (77.78%)

7 existing lines in 3 files now uncovered.

2507 of 3184 relevant lines covered (78.74%)

4.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.81
/src/main/python/kubernator/plugins/k8s_api.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import base64
6✔
20
import json
6✔
21
import re
6✔
22
from collections import namedtuple
6✔
23
from collections.abc import Callable, Mapping, MutableMapping, Sequence, Iterable
6✔
24
from enum import Enum, auto
6✔
25
from functools import partial
6✔
26
from pathlib import Path
6✔
27
from typing import Union, Optional
6✔
28

29
import yaml
6✔
30
from jsonschema._format import FormatChecker
6✔
31
from jsonschema._keywords import required
6✔
32
from jsonschema.exceptions import ValidationError
6✔
33
from jsonschema.validators import extend, Draft7Validator
6✔
34
from openapi_schema_validator import OAS31Validator
6✔
35

36
from kubernator.api import load_file, FileType, load_remote_file, calling_frame_source, parse_yaml_docs
6✔
37

38
K8S_WARNING_HEADER = re.compile(r'(?:,\s*)?(\d{3})\s+(\S+)\s+"(.+?)(?<!\\)"(?:\s+\"(.+?)(?<!\\)\")?\s*')
6✔
39
UPPER_FOLLOWED_BY_LOWER_RE = re.compile(r"(.)([A-Z][a-z]+)")
6✔
40
LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE = re.compile(r"([a-z0-9])([A-Z])")
6✔
41

42
K8S_MINIMAL_RESOURCE_SCHEMA = {
6✔
43
    "properties": {
44
        "apiVersion": {
45
            "type": "string"
46
        },
47
        "kind": {
48
            "type": "string"
49
        },
50
        "metadata": {
51
            "type": "object",
52
            "properties": {
53
                "name": {
54
                    "type": "string"
55
                },
56
                "namespace": {
57
                    "type": "string"
58
                }
59
            },
60
            "required": ["name"]
61
        }
62
    },
63
    "type": "object",
64
    "required": ["apiVersion", "kind"]
65
}
66
K8S_MINIMAL_RESOURCE_VALIDATOR = Draft7Validator(K8S_MINIMAL_RESOURCE_SCHEMA)
6✔
67

68
CLUSTER_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}([^/]+)$")
6✔
69
NAMESPACED_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}namespaces/[^/]+/([^/]+)$")
6✔
70

71

72
class K8SResourcePatchType(Enum):
6✔
73
    JSON_PATCH = auto()
6✔
74
    SERVER_SIDE_PATCH = auto()
6✔
75

76

77
class K8SPropagationPolicy(Enum):
6✔
78
    BACKGROUND = ("Background",)
6✔
79
    FOREGROUND = ("Foreground",)
6✔
80
    ORPHAN = ("Orphan",)
6✔
81

82
    def __init__(self, policy):
6✔
83
        self.policy = policy
6✔
84

85

86
def is_integer(instance):
6✔
87
    # bool inherits from int, so ensure bools aren't reported as ints
88
    if isinstance(instance, bool):
6!
89
        return False
×
90
    return isinstance(instance, int)
6✔
91

92

93
def is_string(instance):
6✔
94
    return isinstance(instance, str)
6✔
95

96

97
def type_validator(validator, data_type, instance, schema):
6✔
98
    if instance is None:
6!
99
        return
×
100

101
    if data_type == "string" and schema.get("format") == "int-or-string":
6✔
102
        if not (is_string(instance) or is_integer(instance)):
6!
103
            yield ValidationError("%r is not of type %s" % (instance, "int-or-string"))
×
104
    elif not validator.is_type(instance, data_type):
6!
105
        yield ValidationError("%r is not of type %s" % (instance, data_type))
×
106

107

108
K8SValidator = extend(OAS31Validator, validators={
6✔
109
    "type": type_validator,
110
    "required": required
111
})
112

113
k8s_format_checker = FormatChecker()
6✔
114

115

116
@k8s_format_checker.checks("int32")
6✔
117
def check_int32(value):
6✔
118
    return value is not None and (-2147483648 < value < 2147483647)
6✔
119

120

121
@k8s_format_checker.checks("int64")
6✔
122
def check_int64(value):
6✔
123
    return value is not None and (-9223372036854775808 < value < 9223372036854775807)
6✔
124

125

126
@k8s_format_checker.checks("float")
6✔
127
def check_float(value):
6✔
128
    return value is not None and (-3.4E+38 < value < +3.4E+38)
×
129

130

131
@k8s_format_checker.checks("double")
6✔
132
def check_double(value):
6✔
133
    return value is not None and (-1.7E+308 < value < +1.7E+308)
6✔
134

135

136
@k8s_format_checker.checks("byte", ValueError)
6✔
137
def check_byte(value):
6✔
138
    if value is None:
×
139
        return False
×
140
    base64.b64decode(value, validate=True)
×
141
    return True
×
142

143

144
@k8s_format_checker.checks("int-or-string")
6✔
145
def check_int_or_string(value):
6✔
146
    return check_int32(value) if is_integer(value) else is_string(value)
6✔
147

148

149
def to_group_and_version(api_version):
6✔
150
    group, _, version = api_version.partition("/")
6✔
151
    if not version:
6✔
152
        version = group
6✔
153
        group = ""
6✔
154
    return group, version
6✔
155

156

157
def to_k8s_resource_def_key(manifest):
6✔
158
    return K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]),
×
159
                             manifest["kind"])
160

161

162
class K8SResourceDefKey(namedtuple("K8SResourceDefKey", ["group", "version", "kind"])):
6✔
163
    __slots__ = ()
6✔
164

165
    def __str__(self):
6✔
166
        return f"{self.group}{'/' if self.group else '/'}{self.version}/{self.kind}"
6✔
167

168

169
class K8SResourceDef:
6✔
170
    def __init__(self, key, singular, plural, namespaced, custom, schema):
6✔
171
        self.key = key
6✔
172
        self.singular = singular
6✔
173
        self.plural = plural
6✔
174
        self.namespaced = namespaced
6✔
175
        self.custom = custom
6✔
176
        self.schema = schema
6✔
177

178
        self._api_get = None
6✔
179
        self._api_create = None
6✔
180
        self._api_patch = None
6✔
181
        self._api_delete = None
6✔
182

183
    @property
6✔
184
    def group(self) -> str:
6✔
185
        return self.key.group
6✔
186

187
    @property
6✔
188
    def version(self) -> str:
6✔
189
        return self.key.version
6✔
190

191
    @property
6✔
192
    def kind(self) -> str:
6✔
193
        return self.key.kind
6✔
194

195
    @property
6✔
196
    def has_api(self) -> bool:
6✔
197
        return self.custom or self.plural
6✔
198

199
    @property
6✔
200
    def get(self):
6✔
201
        return self._api_get
6✔
202

203
    @property
6✔
204
    def create(self):
6✔
205
        return self._api_create
6✔
206

207
    @property
6✔
208
    def patch(self):
6✔
209
        return self._api_patch
6✔
210

211
    @property
6✔
212
    def delete(self):
6✔
213
        return self._api_delete
6✔
214

215
    def __eq__(self, o: object) -> bool:
6✔
216
        if not isinstance(o, K8SResourceDef):
×
217
            return False
×
218

219
        return (self.key == o.key and
×
220
                self.singular == o.singular and
221
                self.plural == o.plural and
222
                self.namespaced == o.namespaced and
223
                self.custom == o.custom)
224

225
    def __hash__(self) -> int:
6✔
226
        return self.key.__hash__()
×
227

228
    def __str__(self):
6✔
229
        return f"{self.key=}, {self.singular=}, {self.plural=}, {self.namespaced=}, {self.custom=}"
×
230

231
    @classmethod
6✔
232
    def from_manifest(cls, key: K8SResourceDefKey,
6✔
233
                      schema,
234
                      paths: Mapping[K8SResourceDefKey, Mapping[str, Mapping]]):
235
        singular = key.kind.lower()
6✔
236

237
        plural = None
6✔
238
        namespaced = False
6✔
239

240
        if singular == "namespace":
6✔
241
            plural = "namespaces"
6✔
242
        else:
243
            for path in paths.get(key, ()):
6✔
244
                if m := NAMESPACED_RESOURCE_PATH.fullmatch(path):
6✔
245
                    plural = m[1]
6✔
246
                    namespaced = True
6✔
247
                    break
6✔
248
                elif m := CLUSTER_RESOURCE_PATH.fullmatch(path):
6✔
249
                    plural = m[1]
6✔
250

251
        yield K8SResourceDef(key, singular, plural, namespaced, False, schema)
6✔
252

253
    @classmethod
6✔
254
    def from_resource(cls, resource: "K8SResource"):
6✔
255
        manifest = resource.manifest
6✔
256
        spec = manifest["spec"]
6✔
257
        group = spec["group"]
6✔
258
        names = spec["names"]
6✔
259
        kind = names["kind"]
6✔
260
        singular = names.get("singular", names["kind"].lower())
6✔
261
        plural = names["plural"]
6✔
262
        namespaced = spec["scope"] == "Namespaced"
6✔
263

264
        for version_spec in spec["versions"]:
6✔
265
            version = version_spec["name"]
6✔
266
            if resource.version == "v1":
6!
267
                schema = version_spec["schema"]["openAPIV3Schema"]
6✔
268
            else:
269
                schema = spec["validation"]["openAPIV3Schema"]
×
270
            yield K8SResourceDef(K8SResourceDefKey(group, version, kind), singular, plural, namespaced, True, schema)
6✔
271

272
    def populate_api(self, k8s_client_module, k8s_client):
6✔
273
        if not self.has_api:
6!
274
            raise RuntimeError(f"{self} has no API")
×
275

276
        if self._api_get:
6✔
277
            return
6✔
278

279
        group = self.group or "core"
6✔
280
        version = self.version
6✔
281
        kind = self.kind
6✔
282

283
        if self.custom:
6✔
284
            k8s_api = k8s_client_module.CustomObjectsApi(k8s_client)
6✔
285

286
            kwargs = {"group": group,
6✔
287
                      "version": version,
288
                      "plural": self.plural}
289
            if self.namespaced:
6!
290
                self._api_get = partial(k8s_api.get_namespaced_custom_object, **kwargs)
6✔
291
                self._api_patch = partial(k8s_api.patch_namespaced_custom_object, **kwargs)
6✔
292
                self._api_create = partial(k8s_api.create_namespaced_custom_object, **kwargs)
6✔
293
                self._api_delete = partial(k8s_api.delete_namespaced_custom_object, **kwargs)
6✔
294
            else:
295
                self._api_get = partial(k8s_api.get_cluster_custom_object, **kwargs)
×
296
                self._api_patch = partial(k8s_api.patch_cluster_custom_object, **kwargs)
×
297
                self._api_create = partial(k8s_api.create_cluster_custom_object, **kwargs)
×
298
                self._api_delete = partial(k8s_api.delete_cluster_custom_object, **kwargs)
×
299
        else:
300
            # Take care for the case e.g. api_type is "apiextensions.k8s.io"
301
            # Only replace the last instance
302
            group = "".join(group.rsplit(".k8s.io", 1))
6✔
303

304
            # convert group name from DNS subdomain format to
305
            # python class name convention
306
            group = "".join(word.capitalize() for word in group.split('.'))
6✔
307
            fcn_to_call = f"{group}{version.capitalize()}Api"
6✔
308
            k8s_api = getattr(k8s_client_module, fcn_to_call)(k8s_client)
6✔
309

310
            # Replace CamelCased action_type into snake_case
311
            kind = UPPER_FOLLOWED_BY_LOWER_RE.sub(r"\1_\2", kind)
6✔
312
            kind = LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE.sub(r"\1_\2", kind).lower()
6✔
313

314
            if self.namespaced:
6✔
315
                self._api_get = getattr(k8s_api, f"read_namespaced_{kind}")
6✔
316
                self._api_patch = getattr(k8s_api, f"patch_namespaced_{kind}")
6✔
317
                self._api_create = getattr(k8s_api, f"create_namespaced_{kind}")
6✔
318
                self._api_delete = getattr(k8s_api, f"delete_namespaced_{kind}")
6✔
319
            else:
320
                self._api_get = getattr(k8s_api, f"read_{kind}")
6✔
321
                self._api_patch = getattr(k8s_api, f"patch_{kind}")
6✔
322
                self._api_create = getattr(k8s_api, f"create_{kind}")
6✔
323
                self._api_delete = getattr(k8s_api, f"delete_{kind}")
6✔
324

325

326
class K8SResourceKey(namedtuple("K8SResourceKey", ["group", "kind", "name", "namespace"])):
6✔
327
    __slots__ = ()
6✔
328

329
    def __str__(self):
6✔
330
        return (f"{self.group}{'/' if self.group else 'v1/'}{self.kind}"
×
331
                f"/{self.name}{f'.{self.namespace}' if self.namespace else ''}")
332

333

334
class K8SResource:
6✔
335
    _k8s_client_version = None
6✔
336
    _k8s_field_validation = None
6✔
337
    _k8s_field_validation_patched = None
6✔
338
    _logger = None
6✔
339
    _api_warnings = None
6✔
340

341
    def __init__(self, manifest: dict, rdef: K8SResourceDef, source: Union[str, Path] = None):
6✔
342
        self.key = self.get_manifest_key(manifest)
6✔
343

344
        self.manifest = manifest
6✔
345
        self.rdef = rdef
6✔
346
        self.source = source
6✔
347

348
    @property
6✔
349
    def group(self) -> str:
6✔
350
        return self.key.group
6✔
351

352
    @property
6✔
353
    def version(self) -> str:
6✔
354
        return self.rdef.version
6✔
355

356
    @property
6✔
357
    def kind(self) -> str:
6✔
358
        return self.key.kind
6✔
359

360
    @property
6✔
361
    def name(self) -> str:
6✔
362
        return self.key.name
6✔
363

364
    @name.setter
6✔
365
    def name(self, value):
6✔
366
        self.manifest["metadata"]["name"] = value
×
367
        self.key = self.get_manifest_key(self.manifest)
×
368

369
    @property
6✔
370
    def namespace(self) -> Optional[str]:
6✔
371
        return self.key.namespace
6✔
372

373
    @namespace.setter
6✔
374
    def namespace(self, value):
6✔
375
        self.manifest["metadata"]["namespace"] = value
×
376
        self.key = self.get_manifest_key(self.manifest)
×
377

378
    @property
6✔
379
    def api_version(self) -> str:
6✔
380
        return self.manifest["apiVersion"]
6✔
381

382
    @property
6✔
383
    def schema(self) -> dict:
6✔
384
        return self.rdef.schema
×
385

386
    @property
6✔
387
    def is_crd(self):
6✔
388
        return self.group == "apiextensions.k8s.io" and self.kind == "CustomResourceDefinition"
6✔
389

390
    def __str__(self):
6✔
391
        return f"{self.api_version}/{self.kind}/{self.name}{'.' + self.namespace if self.namespace else ''}"
6✔
392

393
    def get(self):
6✔
394
        rdef = self.rdef
6✔
395
        kwargs = {"name": self.name,
6✔
396
                  "_preload_content": False}
397
        if rdef.namespaced:
6✔
398
            kwargs["namespace"] = self.namespace
6✔
399
        return json.loads(self.rdef.get(**kwargs).data)
6✔
400

401
    def create(self, dry_run=True):
6✔
402
        rdef = self.rdef
6✔
403
        kwargs = {"body": self.manifest,
6✔
404
                  "_preload_content": False,
405
                  "field_manager": "kubernator",
406
                  }
407

408
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
409
        if self._k8s_client_version[0] > 22 and (self._k8s_field_validation_patched or not self.rdef.custom):
6✔
410
            kwargs["field_validation"] = self._k8s_field_validation
6✔
411
        if rdef.namespaced:
6✔
412
            kwargs["namespace"] = self.namespace
6✔
413
        if dry_run:
6✔
414
            kwargs["dry_run"] = "All"
6✔
415
        resp = rdef.create(**kwargs)
6✔
416
        self._process_response_headers(resp)
6✔
417
        return json.loads(resp.data)
6✔
418

419
    def patch(self, json_patch, *, patch_type: K8SResourcePatchType, force=False, dry_run=True):
6✔
420
        rdef = self.rdef
6✔
421
        kwargs = {"name": self.name,
6✔
422
                  "body": json_patch
423
                  if patch_type != K8SResourcePatchType.SERVER_SIDE_PATCH or self._k8s_client_version[0] > 24
424
                  else json.dumps(json_patch),
425
                  "_preload_content": False,
426
                  "field_manager": "kubernator",
427
                  }
428

429
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
430
        if self._k8s_client_version[0] > 22 and (self._k8s_field_validation_patched or not self.rdef.custom):
6✔
431
            kwargs["field_validation"] = self._k8s_field_validation
6✔
432
        if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
6!
433
            kwargs["force"] = force
6✔
434
        if rdef.namespaced:
6!
435
            kwargs["namespace"] = self.namespace
×
436
        if dry_run:
6!
437
            kwargs["dry_run"] = "All"
6✔
438

439
        def select_header_content_type_patch(content_types):
6✔
440
            if patch_type == K8SResourcePatchType.JSON_PATCH:
6!
441
                return "application/json-patch+json"
×
442
            if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
6!
443
                return "application/apply-patch+yaml"
6✔
444
            raise NotImplementedError
×
445

446
        if isinstance(rdef.patch, partial):
6!
447
            api_client = rdef.patch.func.__self__.api_client
×
448
        else:
449
            api_client = rdef.patch.__self__.api_client
6✔
450

451
        old_func = api_client.select_header_content_type
6✔
452
        try:
6✔
453
            api_client.select_header_content_type = select_header_content_type_patch
6✔
454
            resp = rdef.patch(**kwargs)
6✔
455
            self._process_response_headers(resp)
6✔
456
            return json.loads(resp.data)
6✔
457
        finally:
458
            api_client.select_header_content_type = old_func
6✔
459

460
    def delete(self, *, dry_run=True, propagation_policy=K8SPropagationPolicy.BACKGROUND):
6✔
461
        rdef = self.rdef
6✔
462
        kwargs = {"name": self.name,
6✔
463
                  "_preload_content": False,
464
                  "propagation_policy": propagation_policy.policy
465
                  }
466
        if rdef.namespaced:
6!
467
            kwargs["namespace"] = self.namespace
6✔
468
        if dry_run:
6✔
469
            kwargs["dry_run"] = "All"
6✔
470

471
        return json.loads(rdef.delete(**kwargs).data)
6✔
472

473
    @staticmethod
6✔
474
    def get_manifest_key(manifest):
6✔
475
        return K8SResourceKey(to_group_and_version(manifest["apiVersion"])[0],
6✔
476
                              manifest["kind"],
477
                              manifest["metadata"]["name"],
478
                              manifest["metadata"].get("namespace"))
479

480
    @staticmethod
6✔
481
    def get_manifest_description(manifest: dict, source=None):
6✔
482
        api_version = manifest.get("apiVersion")
6✔
483
        kind = manifest.get("kind")
6✔
484
        metadata = manifest.get("metadata")
6✔
485
        name = None
6✔
486
        namespace = None
6✔
487
        if metadata:
6!
488
            name = metadata.get("name")
6✔
489
            namespace = metadata.get("namespace")
6✔
490
        return (f"{api_version or 'unknown'}/{kind or '<unknown>'}/"
6✔
491
                f"{name or '<unknown>'}{'.' + namespace if namespace else ''}")
492

493
    def __eq__(self, other):
6✔
494
        if not isinstance(other, K8SResource):
×
495
            return False
×
496
        return self.key == other.key and self.manifest == other.manifest
×
497

498
    def _process_response_headers(self, resp):
6✔
499
        headers = resp.headers
6✔
500
        warn_headers = headers.get("Warning")
6✔
501
        if warn_headers:
6✔
502
            for warn in K8S_WARNING_HEADER.findall(warn_headers):
6✔
503
                code, _, msg, _ = warn
6✔
504
                code = int(code)
6✔
505
                msg = msg.encode("utf-8").decode("unicode_escape")
6✔
506
                if code == 299:
6!
507
                    self._api_warnings(self, msg)
6✔
508
                else:
509
                    self._logger.warning("Unknown API warning received for resource %s from %s: code %d: %s",
×
510
                                         self, self.source, code, msg)
511

512

513
class K8SResourcePluginMixin:
6✔
514
    def __init__(self):
6✔
515
        self.resource_definitions: MutableMapping[K8SResourceDefKey, K8SResourceDef] = {}
6✔
516
        self.resource_paths: MutableMapping[K8SResourceDefKey, MutableMapping[str, dict]] = {}
6✔
517
        self.resources: MutableMapping[K8SResourceKey, K8SResource] = {}
6✔
518

519
        self.resource_definitions_schema = None
6✔
520

521
    def add_resources(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
6✔
522
        if not source:
6!
523
            source = calling_frame_source()
×
524

525
        if isinstance(manifests, str):
6✔
526
            manifests = list(parse_yaml_docs(manifests, source))
6✔
527

528
        if isinstance(manifests, (Mapping, dict)):
6!
529
            return self.add_resource(manifests, source)
×
530
        else:
531
            return [self.add_resource(m, source) for m in manifests if m]
6✔
532

533
    def add_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
534
        if not source:
6!
535
            source = calling_frame_source()
×
536
        resource = self._create_resource(manifest, source)
6✔
537

538
        try:
6✔
539
            trans_resource = self._transform_resource(list(self.resources.values()), resource)
6✔
540
        except Exception as e:
×
541
            self.logger.error("An error occurred running transformers on %s", resource, exc_info=e)
×
542
            raise
×
543

544
        errors = list(self._validate_resource(trans_resource.manifest, source))
6✔
545
        if errors:
6!
546
            for error in errors:
×
547
                if source:
×
548
                    self.logger.error("Error detected in re-transformed K8S resource %s generated through %s",
×
549
                                      trans_resource, source, exc_info=error)
550
            raise errors[0]
×
551

552
        return self._add_resource(trans_resource, source)
6✔
553

554
    def add_crds(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
6✔
555
        if not source:
×
556
            source = calling_frame_source()
×
557

558
        if isinstance(manifests, str):
×
NEW
559
            manifests = list(parse_yaml_docs(manifests, source))
×
560

561
        if isinstance(manifests, (Mapping, dict)):
×
562
            return self.add_crd(manifests, source)
×
563
        else:
564
            return [self.add_crd(m, source) for m in manifests if m]
×
565

566
    def add_crd(self, manifest: dict, source: Union[str, Path] = None):
6✔
567
        if not source:
6!
568
            source = calling_frame_source()
×
569
        resource = self._create_resource(manifest, source)
6✔
570
        if not resource.is_crd:
6!
571
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
572
            raise ValueError(f"K8S manifest {resource_description} from {source} is not a CRD")
×
573

574
        self._add_crd(resource)
6✔
575
        return resource
6✔
576

577
    def create_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
578
        """Create K8S resource without adding it"""
579
        if not source:
×
580
            source = calling_frame_source()
×
581

582
        return self._create_resource(manifest, source)
×
583

584
    def add_local_resources(self, path: Path, file_type: FileType, source: str = None):
6✔
585
        manifests = load_file(self.logger, path, file_type)
×
586

587
        return [self.add_resource(m, source or path) for m in manifests if m]
×
588

589
    def add_remote_resources(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
6✔
590
                             source: str = None):
591
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
×
592

593
        return [self.add_resource(m, source or url) for m in manifests if m]
×
594

595
    def add_local_crds(self, path: Path, file_type: FileType, source: str = None):
6✔
596
        manifests = load_file(self.logger, path, file_type)
6✔
597

598
        return [self.add_crd(m, source or path) for m in manifests if m]
6✔
599

600
    def add_remote_crds(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
6✔
601
                        source: str = None):
602
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
6✔
603

604
        return [self.add_crd(m, source or url) for m in manifests if m]
6✔
605

606
    def get_api_versions(self):
6✔
607
        api_versions = set()
6✔
608
        for rdef in self.resource_definitions:
6✔
609
            api_version = f"{f'{rdef.group}/' if rdef.group else ''}{rdef.version}"
6✔
610
            if api_version not in api_versions:
6✔
611
                api_versions.add(api_version)
6✔
612
        return sorted(api_versions)
6✔
613

614
    def _create_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
615
        resource_description = K8SResource.get_manifest_description(manifest, source)
6✔
616

617
        new_manifest = self._patch_manifest(manifest, resource_description)
6✔
618
        if new_manifest != manifest:
6!
NEW
619
            manifest = new_manifest
×
NEW
620
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
621

622
        self.logger.debug("Validating K8S manifest for %s", resource_description)
6✔
623
        errors = list(self._validate_resource(manifest, source))
6✔
624
        if errors:
6!
625
            for error in errors:
×
626
                self.logger.error("Error detected in K8S manifest %s from %s: \n%s",
×
627
                                  resource_description, source or "<unknown>", yaml.safe_dump(manifest, None),
628
                                  exc_info=error)
629
            raise errors[0]
×
630

631
        rdef = self._get_manifest_rdef(manifest)
6✔
632
        return K8SResource(manifest, rdef, source)
6✔
633

634
    def _add_resource(self, resource: K8SResource, source):
6✔
635
        if resource.key in self.resources:
6!
636
            existing_resource = self.resources[resource.key]
×
637
            if resource != existing_resource:
×
638
                raise ValidationError("resource %s from %s already exists and was added from %s" %
×
639
                                      (resource.key, resource.source, existing_resource.source))
640
            self.logger.trace("K8S resource for %s from %s is already present and is identical", resource, source)
×
641
            return existing_resource
×
642

643
        self.logger.info("Adding K8S resource for %s from %s", resource, source)
6✔
644
        self.resources[resource.key] = resource
6✔
645

646
        if resource.is_crd:
6✔
647
            self._add_crd(resource)
6✔
648

649
        return resource
6✔
650

651
    def _patch_manifest(self,
6✔
652
                        manifest: dict,
653
                        resource_description: str):
654
        return manifest
6✔
655

656
    def _transform_resource(self,
6✔
657
                            resources: Sequence[K8SResource],
658
                            resource: K8SResource) -> K8SResource:
659
        return resource
6✔
660

661
    def _filter_resources(self, func: Callable[[K8SResource], bool]):
6✔
662
        yield from filter(func, self.resources.values())
×
663

664
    def _validate_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
665
        for error in self._yield_manifest_rdef(manifest):
6✔
666
            if isinstance(error, Exception):
6!
667
                yield error
×
668
            else:
669
                rdef = error
6✔
670
                k8s_validator = K8SValidator(rdef.schema,
6✔
671
                                             format_checker=k8s_format_checker)
672
                yield from k8s_validator.iter_errors(manifest)
6✔
673

674
    def _get_manifest_rdef(self, manifest):
6✔
675
        for error in self._yield_manifest_rdef(manifest):
6!
676
            if isinstance(error, Exception):
6!
677
                raise error
×
678
            else:
679
                return error
6✔
680

681
    def _yield_manifest_rdef(self, manifest):
6✔
682
        error = None
6✔
683
        for error in K8S_MINIMAL_RESOURCE_VALIDATOR.iter_errors(manifest):
6!
684
            yield error
×
685

686
        if error:
6!
687
            return
×
688

689
        key = K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]), manifest["kind"])
6✔
690

691
        try:
6✔
692
            yield self.resource_definitions[key]
6✔
693
        except KeyError:
6✔
694
            yield ValidationError("%s is not a defined Kubernetes resource" % (key,),
×
695
                                  validator=K8S_MINIMAL_RESOURCE_VALIDATOR,
696
                                  validator_value=key,
697
                                  instance=manifest,
698
                                  schema=K8S_MINIMAL_RESOURCE_SCHEMA)
699

700
    def _add_crd(self, resource: K8SResource):
6✔
701
        for crd in K8SResourceDef.from_resource(resource):
6✔
702
            self.logger.info("Adding K8S CRD definition %s", crd.key)
6✔
703
            self.resource_definitions[crd.key] = crd
6✔
704

705
    def _populate_resource_definitions(self):
6✔
706
        k8s_def = self.resource_definitions_schema
6✔
707

708
        def k8s_resource_def_key(v: Mapping[str, Union[list, Mapping]]) -> Iterable[K8SResourceDefKey]:
6✔
709
            gvks = v.get("x-kubernetes-group-version-kind")
6✔
710
            if gvks:
6✔
711
                if isinstance(gvks, Mapping):
6✔
712
                    gvk = gvks
6✔
713
                    yield K8SResourceDefKey(gvk["group"],
6✔
714
                                            gvk["version"],
715
                                            gvk["kind"])
716
                else:
717
                    for gvk in gvks:
6✔
718
                        yield K8SResourceDefKey(gvk["group"],
6✔
719
                                                gvk["version"],
720
                                                gvk["kind"])
721

722
        paths = k8s_def["paths"]
6✔
723
        for path, actions in paths.items():
6✔
724
            path_rdk = None
6✔
725
            path_actions = []
6✔
726
            for action, action_details in actions.items():
6✔
727
                if action == "parameters":
6✔
728
                    continue
6✔
729
                rdks = list(k8s_resource_def_key(action_details))
6✔
730
                if rdks:
6✔
731
                    assert len(rdks) == 1
6✔
732
                    rdk = rdks[0]
6✔
733
                    if path_rdk:
6✔
734
                        if path_rdk != rdk:
6!
735
                            raise ValueError(f"Encountered path action x-kubernetes-group-version-kind conflict: "
×
736
                                             f"{path}: {actions}")
737
                        path_actions.append(action_details["x-kubernetes-action"])
6✔
738
                    else:
739
                        path_rdk = rdk
6✔
740

741
            if path_rdk:
6✔
742
                rdef_paths = self.resource_paths.get(path_rdk)
6✔
743
                if not rdef_paths:
6✔
744
                    rdef_paths = {}
6✔
745
                    self.resource_paths[path_rdk] = rdef_paths
6✔
746
                rdef_paths[path] = actions
6✔
747

748
        for k, schema in k8s_def["definitions"].items():
6✔
749
            # This short-circuits the resolution of the references to the top of the document
750
            schema["definitions"] = k8s_def["definitions"]
6✔
751
            for key in k8s_resource_def_key(schema):
6✔
752
                for rdef in K8SResourceDef.from_manifest(key, schema, self.resource_paths):
6✔
753
                    self.resource_definitions[key] = rdef
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc