• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 18432987062

11 Oct 2025 06:10PM UTC coverage: 75.397% (+0.06%) from 75.342%
18432987062

push

github

web-flow
Merge pull request #85 from karellen/schema_format_validation_not_none

When validating schema format numbers (int32 etc) disallow None

627 of 983 branches covered (63.78%)

Branch coverage included in aggregate %.

3 of 5 new or added lines in 1 file covered. (60.0%)

6 existing lines in 4 files now uncovered.

2456 of 3106 relevant lines covered (79.07%)

4.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.04
/src/main/python/kubernator/plugins/k8s_api.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import base64
6✔
20
import json
6✔
21
import re
6✔
22
from collections import namedtuple
6✔
23
from collections.abc import Callable, Mapping, MutableMapping, Sequence, Iterable
6✔
24
from enum import Enum, auto
6✔
25
from functools import partial
6✔
26
from io import StringIO
6✔
27
from pathlib import Path
6✔
28
from typing import Union, Optional
6✔
29

30
import yaml
6✔
31
from jsonschema._format import FormatChecker
6✔
32
from jsonschema._keywords import required
6✔
33
from jsonschema.exceptions import ValidationError
6✔
34
from jsonschema.validators import extend, Draft7Validator
6✔
35
from openapi_schema_validator import OAS31Validator
6✔
36

37
from kubernator.api import load_file, FileType, load_remote_file, calling_frame_source
6✔
38

39
K8S_WARNING_HEADER = re.compile(r'(?:,\s*)?(\d{3})\s+(\S+)\s+"(.+?)(?<!\\)"(?:\s+\"(.+?)(?<!\\)\")?\s*')
6✔
40
UPPER_FOLLOWED_BY_LOWER_RE = re.compile(r"(.)([A-Z][a-z]+)")
6✔
41
LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE = re.compile(r"([a-z0-9])([A-Z])")
6✔
42

43
K8S_MINIMAL_RESOURCE_SCHEMA = {
6✔
44
    "properties": {
45
        "apiVersion": {
46
            "type": "string"
47
        },
48
        "kind": {
49
            "type": "string"
50
        },
51
        "metadata": {
52
            "type": "object",
53
            "properties": {
54
                "name": {
55
                    "type": "string"
56
                },
57
                "namespace": {
58
                    "type": "string"
59
                }
60
            },
61
            "required": ["name"]
62
        }
63
    },
64
    "type": "object",
65
    "required": ["apiVersion", "kind"]
66
}
67
K8S_MINIMAL_RESOURCE_VALIDATOR = Draft7Validator(K8S_MINIMAL_RESOURCE_SCHEMA)
6✔
68

69
CLUSTER_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}([^/]+)$")
6✔
70
NAMESPACED_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}namespaces/[^/]+/([^/]+)$")
6✔
71

72

73
class K8SResourcePatchType(Enum):
6✔
74
    JSON_PATCH = auto()
6✔
75
    SERVER_SIDE_PATCH = auto()
6✔
76

77

78
class K8SPropagationPolicy(Enum):
6✔
79
    BACKGROUND = ("Background",)
6✔
80
    FOREGROUND = ("Foreground",)
6✔
81
    ORPHAN = ("Orphan",)
6✔
82

83
    def __init__(self, policy):
6✔
84
        self.policy = policy
6✔
85

86

87
def is_integer(instance):
6✔
88
    # bool inherits from int, so ensure bools aren't reported as ints
89
    if isinstance(instance, bool):
6!
90
        return False
×
91
    return isinstance(instance, int)
6✔
92

93

94
def is_string(instance):
6✔
95
    return isinstance(instance, str)
6✔
96

97

98
def type_validator(validator, data_type, instance, schema):
6✔
99
    if instance is None:
6!
100
        return
×
101

102
    if data_type == "string" and schema.get("format") == "int-or-string":
6✔
103
        if not (is_string(instance) or is_integer(instance)):
6!
104
            yield ValidationError("%r is not of type %s" % (instance, "int-or-string"))
×
105
    elif not validator.is_type(instance, data_type):
6!
106
        yield ValidationError("%r is not of type %s" % (instance, data_type))
×
107

108

109
K8SValidator = extend(OAS31Validator, validators={
6✔
110
    "type": type_validator,
111
    "required": required
112
})
113

114
k8s_format_checker = FormatChecker()
6✔
115

116

117
@k8s_format_checker.checks("int32")
6✔
118
def check_int32(value):
6✔
119
    return value is not None and (-2147483648 < value < 2147483647)
6✔
120

121

122
@k8s_format_checker.checks("int64")
6✔
123
def check_int64(value):
6✔
124
    return value is not None and (-9223372036854775808 < value < 9223372036854775807)
6✔
125

126

127
@k8s_format_checker.checks("float")
6✔
128
def check_float(value):
6✔
NEW
129
    return value is not None and (-3.4E+38 < value < +3.4E+38)
×
130

131

132
@k8s_format_checker.checks("double")
6✔
133
def check_double(value):
6✔
134
    return value is not None and (-1.7E+308 < value < +1.7E+308)
6✔
135

136

137
@k8s_format_checker.checks("byte", ValueError)
6✔
138
def check_byte(value):
6✔
139
    if value is None:
×
140
        return False
×
141
    base64.b64decode(value, validate=True)
×
142
    return True
×
143

144

145
@k8s_format_checker.checks("int-or-string")
6✔
146
def check_int_or_string(value):
6✔
147
    return check_int32(value) if is_integer(value) else is_string(value)
6✔
148

149

150
def to_group_and_version(api_version):
6✔
151
    group, _, version = api_version.partition("/")
6✔
152
    if not version:
6✔
153
        version = group
6✔
154
        group = ""
6✔
155
    return group, version
6✔
156

157

158
def to_k8s_resource_def_key(manifest):
6✔
159
    return K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]),
×
160
                             manifest["kind"])
161

162

163
class K8SResourceDefKey(namedtuple("K8SResourceDefKey", ["group", "version", "kind"])):
6✔
164
    __slots__ = ()
6✔
165

166
    def __str__(self):
6✔
167
        return f"{self.group}{'/' if self.group else '/'}{self.version}/{self.kind}"
6✔
168

169

170
class K8SResourceDef:
6✔
171
    def __init__(self, key, singular, plural, namespaced, custom, schema):
6✔
172
        self.key = key
6✔
173
        self.singular = singular
6✔
174
        self.plural = plural
6✔
175
        self.namespaced = namespaced
6✔
176
        self.custom = custom
6✔
177
        self.schema = schema
6✔
178

179
        self._api_get = None
6✔
180
        self._api_create = None
6✔
181
        self._api_patch = None
6✔
182
        self._api_delete = None
6✔
183

184
    @property
6✔
185
    def group(self) -> str:
6✔
186
        return self.key.group
6✔
187

188
    @property
6✔
189
    def version(self) -> str:
6✔
190
        return self.key.version
6✔
191

192
    @property
6✔
193
    def kind(self) -> str:
6✔
194
        return self.key.kind
6✔
195

196
    @property
6✔
197
    def has_api(self) -> bool:
6✔
198
        return self.custom or self.plural
6✔
199

200
    @property
6✔
201
    def get(self):
6✔
202
        return self._api_get
6✔
203

204
    @property
6✔
205
    def create(self):
6✔
206
        return self._api_create
6✔
207

208
    @property
6✔
209
    def patch(self):
6✔
210
        return self._api_patch
6✔
211

212
    @property
6✔
213
    def delete(self):
6✔
214
        return self._api_delete
6✔
215

216
    def __eq__(self, o: object) -> bool:
6✔
217
        if not isinstance(o, K8SResourceDef):
×
218
            return False
×
219

220
        return (self.key == o.key and
×
221
                self.singular == o.singular and
222
                self.plural == o.plural and
223
                self.namespaced == o.namespaced and
224
                self.custom == o.custom)
225

226
    def __hash__(self) -> int:
6✔
227
        return self.key.__hash__()
×
228

229
    def __str__(self):
6✔
230
        return f"{self.key=}, {self.singular=}, {self.plural=}, {self.namespaced=}, {self.custom=}"
×
231

232
    @classmethod
6✔
233
    def from_manifest(cls, key: K8SResourceDefKey,
6✔
234
                      schema,
235
                      paths: Mapping[K8SResourceDefKey, Mapping[str, Mapping]]):
236
        singular = key.kind.lower()
6✔
237

238
        plural = None
6✔
239
        namespaced = False
6✔
240

241
        if singular == "namespace":
6✔
242
            plural = "namespaces"
6✔
243
        else:
244
            for path in paths.get(key, ()):
6✔
245
                if m := NAMESPACED_RESOURCE_PATH.fullmatch(path):
6✔
246
                    plural = m[1]
6✔
247
                    namespaced = True
6✔
248
                    break
6✔
249
                elif m := CLUSTER_RESOURCE_PATH.fullmatch(path):
6✔
250
                    plural = m[1]
6✔
251

252
        yield K8SResourceDef(key, singular, plural, namespaced, False, schema)
6✔
253

254
    @classmethod
6✔
255
    def from_resource(cls, resource: "K8SResource"):
6✔
256
        manifest = resource.manifest
6✔
257
        spec = manifest["spec"]
6✔
258
        group = spec["group"]
6✔
259
        names = spec["names"]
6✔
260
        kind = names["kind"]
6✔
261
        singular = names.get("singular", names["kind"].lower())
6✔
262
        plural = names["plural"]
6✔
263
        namespaced = spec["scope"] == "Namespaced"
6✔
264

265
        for version_spec in spec["versions"]:
6✔
266
            version = version_spec["name"]
6✔
267
            if resource.version == "v1":
6!
268
                schema = version_spec["schema"]["openAPIV3Schema"]
6✔
269
            else:
270
                schema = spec["validation"]["openAPIV3Schema"]
×
271
            yield K8SResourceDef(K8SResourceDefKey(group, version, kind), singular, plural, namespaced, True, schema)
6✔
272

273
    def populate_api(self, k8s_client_module, k8s_client):
6✔
274
        if not self.has_api:
6!
275
            raise RuntimeError(f"{self} has no API")
×
276

277
        if self._api_get:
6✔
278
            return
6✔
279

280
        group = self.group or "core"
6✔
281
        version = self.version
6✔
282
        kind = self.kind
6✔
283

284
        if self.custom:
6✔
285
            k8s_api = k8s_client_module.CustomObjectsApi(k8s_client)
6✔
286

287
            kwargs = {"group": group,
6✔
288
                      "version": version,
289
                      "plural": self.plural}
290
            if self.namespaced:
6!
291
                self._api_get = partial(k8s_api.get_namespaced_custom_object, **kwargs)
6✔
292
                self._api_patch = partial(k8s_api.patch_namespaced_custom_object, **kwargs)
6✔
293
                self._api_create = partial(k8s_api.create_namespaced_custom_object, **kwargs)
6✔
294
                self._api_delete = partial(k8s_api.delete_namespaced_custom_object, **kwargs)
6✔
295
            else:
296
                self._api_get = partial(k8s_api.get_cluster_custom_object, **kwargs)
×
297
                self._api_patch = partial(k8s_api.patch_cluster_custom_object, **kwargs)
×
298
                self._api_create = partial(k8s_api.create_cluster_custom_object, **kwargs)
×
299
                self._api_delete = partial(k8s_api.delete_cluster_custom_object, **kwargs)
×
300
        else:
301
            # Take care for the case e.g. api_type is "apiextensions.k8s.io"
302
            # Only replace the last instance
303
            group = "".join(group.rsplit(".k8s.io", 1))
6✔
304

305
            # convert group name from DNS subdomain format to
306
            # python class name convention
307
            group = "".join(word.capitalize() for word in group.split('.'))
6✔
308
            fcn_to_call = f"{group}{version.capitalize()}Api"
6✔
309
            k8s_api = getattr(k8s_client_module, fcn_to_call)(k8s_client)
6✔
310

311
            # Replace CamelCased action_type into snake_case
312
            kind = UPPER_FOLLOWED_BY_LOWER_RE.sub(r"\1_\2", kind)
6✔
313
            kind = LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE.sub(r"\1_\2", kind).lower()
6✔
314

315
            if self.namespaced:
6✔
316
                self._api_get = getattr(k8s_api, f"read_namespaced_{kind}")
6✔
317
                self._api_patch = getattr(k8s_api, f"patch_namespaced_{kind}")
6✔
318
                self._api_create = getattr(k8s_api, f"create_namespaced_{kind}")
6✔
319
                self._api_delete = getattr(k8s_api, f"delete_namespaced_{kind}")
6✔
320
            else:
321
                self._api_get = getattr(k8s_api, f"read_{kind}")
6✔
322
                self._api_patch = getattr(k8s_api, f"patch_{kind}")
6✔
323
                self._api_create = getattr(k8s_api, f"create_{kind}")
6✔
324
                self._api_delete = getattr(k8s_api, f"delete_{kind}")
6✔
325

326

327
class K8SResourceKey(namedtuple("K8SResourceKey", ["group", "kind", "name", "namespace"])):
6✔
328
    __slots__ = ()
6✔
329

330
    def __str__(self):
6✔
331
        return (f"{self.group}{'/' if self.group else 'v1/'}{self.kind}"
×
332
                f"/{self.name}{f'.{self.namespace}' if self.namespace else ''}")
333

334

335
class K8SResource:
6✔
336
    _k8s_client_version = None
6✔
337
    _k8s_field_validation = None
6✔
338
    _k8s_field_validation_patched = None
6✔
339
    _logger = None
6✔
340
    _api_warnings = None
6✔
341

342
    def __init__(self, manifest: dict, rdef: K8SResourceDef, source: Union[str, Path] = None):
6✔
343
        self.key = self.get_manifest_key(manifest)
6✔
344

345
        self.manifest = manifest
6✔
346
        self.rdef = rdef
6✔
347
        self.source = source
6✔
348

349
    @property
6✔
350
    def group(self) -> str:
6✔
351
        return self.key.group
6✔
352

353
    @property
6✔
354
    def version(self) -> str:
6✔
355
        return self.rdef.version
6✔
356

357
    @property
6✔
358
    def kind(self) -> str:
6✔
359
        return self.key.kind
6✔
360

361
    @property
6✔
362
    def name(self) -> str:
6✔
363
        return self.key.name
6✔
364

365
    @name.setter
6✔
366
    def name(self, value):
6✔
367
        self.manifest["metadata"]["name"] = value
×
368
        self.key = self.get_manifest_key(self.manifest)
×
369

370
    @property
6✔
371
    def namespace(self) -> Optional[str]:
6✔
372
        return self.key.namespace
6✔
373

374
    @namespace.setter
6✔
375
    def namespace(self, value):
6✔
376
        self.manifest["metadata"]["namespace"] = value
×
377
        self.key = self.get_manifest_key(self.manifest)
×
378

379
    @property
6✔
380
    def api_version(self) -> str:
6✔
381
        return self.manifest["apiVersion"]
6✔
382

383
    @property
6✔
384
    def schema(self) -> dict:
6✔
385
        return self.rdef.schema
×
386

387
    @property
6✔
388
    def is_crd(self):
6✔
389
        return self.group == "apiextensions.k8s.io" and self.kind == "CustomResourceDefinition"
6✔
390

391
    def __str__(self):
6✔
392
        return f"{self.api_version}/{self.kind}/{self.name}{'.' + self.namespace if self.namespace else ''}"
6✔
393

394
    def get(self):
6✔
395
        rdef = self.rdef
6✔
396
        kwargs = {"name": self.name,
6✔
397
                  "_preload_content": False}
398
        if rdef.namespaced:
6✔
399
            kwargs["namespace"] = self.namespace
6✔
400
        return json.loads(self.rdef.get(**kwargs).data)
6✔
401

402
    def create(self, dry_run=True):
6✔
403
        rdef = self.rdef
6✔
404
        kwargs = {"body": self.manifest,
6✔
405
                  "_preload_content": False,
406
                  "field_manager": "kubernator",
407
                  }
408

409
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
410
        if self._k8s_client_version[0] > 22 and (self._k8s_field_validation_patched or not self.rdef.custom):
6✔
411
            kwargs["field_validation"] = self._k8s_field_validation
6✔
412
        if rdef.namespaced:
6✔
413
            kwargs["namespace"] = self.namespace
6✔
414
        if dry_run:
6✔
415
            kwargs["dry_run"] = "All"
6✔
416
        resp = rdef.create(**kwargs)
6✔
417
        self._process_response_headers(resp)
6✔
418
        return json.loads(resp.data)
6✔
419

420
    def patch(self, json_patch, *, patch_type: K8SResourcePatchType, force=False, dry_run=True):
6✔
421
        rdef = self.rdef
6✔
422
        kwargs = {"name": self.name,
6✔
423
                  "body": json_patch
424
                  if patch_type != K8SResourcePatchType.SERVER_SIDE_PATCH or self._k8s_client_version[0] > 24
425
                  else json.dumps(json_patch),
426
                  "_preload_content": False,
427
                  "field_manager": "kubernator",
428
                  }
429

430
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
431
        if self._k8s_client_version[0] > 22 and (self._k8s_field_validation_patched or not self.rdef.custom):
6✔
432
            kwargs["field_validation"] = self._k8s_field_validation
6✔
433
        if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
6!
434
            kwargs["force"] = force
6✔
435
        if rdef.namespaced:
6!
436
            kwargs["namespace"] = self.namespace
×
437
        if dry_run:
6!
438
            kwargs["dry_run"] = "All"
6✔
439

440
        def select_header_content_type_patch(content_types):
6✔
441
            if patch_type == K8SResourcePatchType.JSON_PATCH:
6!
442
                return "application/json-patch+json"
×
443
            if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
6!
444
                return "application/apply-patch+yaml"
6✔
445
            raise NotImplementedError
×
446

447
        if isinstance(rdef.patch, partial):
6!
448
            api_client = rdef.patch.func.__self__.api_client
×
449
        else:
450
            api_client = rdef.patch.__self__.api_client
6✔
451

452
        old_func = api_client.select_header_content_type
6✔
453
        try:
6✔
454
            api_client.select_header_content_type = select_header_content_type_patch
6✔
455
            resp = rdef.patch(**kwargs)
6✔
456
            self._process_response_headers(resp)
6✔
457
            return json.loads(resp.data)
6✔
458
        finally:
459
            api_client.select_header_content_type = old_func
6✔
460

461
    def delete(self, *, dry_run=True, propagation_policy=K8SPropagationPolicy.BACKGROUND):
6✔
462
        rdef = self.rdef
6✔
463
        kwargs = {"name": self.name,
6✔
464
                  "_preload_content": False,
465
                  "propagation_policy": propagation_policy.policy
466
                  }
467
        if rdef.namespaced:
6!
468
            kwargs["namespace"] = self.namespace
6✔
469
        if dry_run:
6✔
470
            kwargs["dry_run"] = "All"
6✔
471

472
        return json.loads(rdef.delete(**kwargs).data)
6✔
473

474
    @staticmethod
6✔
475
    def get_manifest_key(manifest):
6✔
476
        return K8SResourceKey(to_group_and_version(manifest["apiVersion"])[0],
6✔
477
                              manifest["kind"],
478
                              manifest["metadata"]["name"],
479
                              manifest["metadata"].get("namespace"))
480

481
    @staticmethod
6✔
482
    def get_manifest_description(manifest: dict, source=None):
6✔
483
        api_version = manifest.get("apiVersion")
6✔
484
        kind = manifest.get("kind")
6✔
485
        metadata = manifest.get("metadata")
6✔
486
        name = None
6✔
487
        namespace = None
6✔
488
        if metadata:
6!
489
            name = metadata.get("name")
6✔
490
            namespace = metadata.get("namespace")
6✔
491
        return (f"{api_version or 'unknown'}/{kind or '<unknown>'}/"
6✔
492
                f"{name or '<unknown>'}{'.' + namespace if namespace else ''}")
493

494
    def __eq__(self, other):
6✔
495
        if not isinstance(other, K8SResource):
×
496
            return False
×
497
        return self.key == other.key and self.manifest == other.manifest
×
498

499
    def _process_response_headers(self, resp):
6✔
500
        headers = resp.headers
6✔
501
        warn_headers = headers.get("Warning")
6✔
502
        if warn_headers:
6✔
503
            for warn in K8S_WARNING_HEADER.findall(warn_headers):
6✔
504
                code, _, msg, _ = warn
6✔
505
                code = int(code)
6✔
506
                msg = msg.encode("utf-8").decode("unicode_escape")
6✔
507
                if code == 299:
6!
508
                    self._api_warnings(self, msg)
6✔
509
                else:
510
                    self._logger.warning("Unknown API warning received for resource %s from %s: code %d: %s",
×
511
                                         self, self.source, code, msg)
512

513

514
class K8SResourcePluginMixin:
6✔
515
    def __init__(self):
6✔
516
        self.resource_definitions: MutableMapping[K8SResourceDefKey, K8SResourceDef] = {}
6✔
517
        self.resource_paths: MutableMapping[K8SResourceDefKey, MutableMapping[str, dict]] = {}
6✔
518
        self.resources: MutableMapping[K8SResourceKey, K8SResource] = {}
6✔
519

520
        self.resource_definitions_schema = None
6✔
521

522
    def add_resources(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
6✔
523
        if not source:
6!
524
            source = calling_frame_source()
×
525

526
        if isinstance(manifests, str):
6✔
527
            manifests = list(yaml.safe_load_all(StringIO(manifests)))
6✔
528

529
        if isinstance(manifests, (Mapping, dict)):
6!
530
            return self.add_resource(manifests, source)
×
531
        else:
532
            return [self.add_resource(m, source) for m in manifests if m]
6✔
533

534
    def add_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
535
        if not source:
6!
536
            source = calling_frame_source()
×
537
        resource = self._create_resource(manifest, source)
6✔
538

539
        try:
6✔
540
            trans_resource = self._transform_resource(list(self.resources.values()), resource)
6✔
541
        except Exception as e:
×
542
            self.logger.error("An error occurred running transformers on %s", resource, exc_info=e)
×
543
            raise
×
544

545
        errors = list(self._validate_resource(trans_resource.manifest, source))
6✔
546
        if errors:
6!
547
            for error in errors:
×
548
                if source:
×
549
                    self.logger.error("Error detected in re-transformed K8S resource %s generated through %s",
×
550
                                      trans_resource, source, exc_info=error)
551
            raise errors[0]
×
552

553
        return self._add_resource(trans_resource, source)
6✔
554

555
    def add_crds(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
6✔
556
        if not source:
×
557
            source = calling_frame_source()
×
558

559
        if isinstance(manifests, str):
×
560
            manifests = list(yaml.safe_load_all(StringIO(manifests)))
×
561

562
        if isinstance(manifests, (Mapping, dict)):
×
563
            return self.add_crd(manifests, source)
×
564
        else:
565
            return [self.add_crd(m, source) for m in manifests if m]
×
566

567
    def add_crd(self, manifest: dict, source: Union[str, Path] = None):
6✔
568
        if not source:
6!
569
            source = calling_frame_source()
×
570
        resource = self._create_resource(manifest, source)
6✔
571
        if not resource.is_crd:
6!
572
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
573
            raise ValueError(f"K8S manifest {resource_description} from {source} is not a CRD")
×
574

575
        self._add_crd(resource)
6✔
576
        return resource
6✔
577

578
    def create_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
579
        """Create K8S resource without adding it"""
580
        if not source:
×
581
            source = calling_frame_source()
×
582

583
        return self._create_resource(manifest, source)
×
584

585
    def add_local_resources(self, path: Path, file_type: FileType, source: str = None):
6✔
586
        manifests = load_file(self.logger, path, file_type)
×
587

588
        return [self.add_resource(m, source or path) for m in manifests if m]
×
589

590
    def add_remote_resources(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
6✔
591
                             source: str = None):
592
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
×
593

594
        return [self.add_resource(m, source or url) for m in manifests if m]
×
595

596
    def add_local_crds(self, path: Path, file_type: FileType, source: str = None):
6✔
597
        manifests = load_file(self.logger, path, file_type)
6✔
598

599
        return [self.add_crd(m, source or path) for m in manifests if m]
6✔
600

601
    def add_remote_crds(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
6✔
602
                        source: str = None):
603
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
6✔
604

605
        return [self.add_crd(m, source or url) for m in manifests if m]
6✔
606

607
    def get_api_versions(self):
6✔
608
        api_versions = set()
6✔
609
        for rdef in self.resource_definitions:
6✔
610
            api_version = f"{f'{rdef.group}/' if rdef.group else ''}{rdef.version}"
6✔
611
            if api_version not in api_versions:
6✔
612
                api_versions.add(api_version)
6✔
613
        return sorted(api_versions)
6✔
614

615
    def _create_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
616
        resource_description = K8SResource.get_manifest_description(manifest, source)
6✔
617
        self.logger.debug("Validating K8S manifest for %s", resource_description)
6✔
618

619
        errors = list(self._validate_resource(manifest, source))
6✔
620
        if errors:
6!
621
            for error in errors:
×
NEW
622
                self.logger.error("Error detected in K8S manifest %s from %s: \n%s",
×
623
                                  resource_description, source or "<unknown>", yaml.safe_dump(manifest, None),
624
                                  exc_info=error)
UNCOV
625
            raise errors[0]
×
626

627
        rdef = self._get_manifest_rdef(manifest)
6✔
628
        return K8SResource(manifest, rdef, source)
6✔
629

630
    def _add_resource(self, resource: K8SResource, source):
6✔
631
        if resource.key in self.resources:
6!
632
            existing_resource = self.resources[resource.key]
×
633
            if resource != existing_resource:
×
634
                raise ValidationError("resource %s from %s already exists and was added from %s" %
×
635
                                      (resource.key, resource.source, existing_resource.source))
636
            self.logger.trace("K8S resource for %s from %s is already present and is identical", resource, source)
×
637
            return existing_resource
×
638

639
        self.logger.info("Adding K8S resource for %s from %s", resource, source)
6✔
640
        self.resources[resource.key] = resource
6✔
641

642
        if resource.is_crd:
6✔
643
            self._add_crd(resource)
6✔
644

645
        return resource
6✔
646

647
    def _transform_resource(self,
6✔
648
                            resources: Sequence[K8SResource],
649
                            resource: K8SResource) -> K8SResource:
650
        return resource
6✔
651

652
    def _filter_resources(self, func: Callable[[K8SResource], bool]):
6✔
653
        yield from filter(func, self.resources.values())
×
654

655
    def _validate_resource(self, manifest: dict, source: Union[str, Path] = None):
6✔
656
        for error in self._yield_manifest_rdef(manifest):
6✔
657
            if isinstance(error, Exception):
6!
658
                yield error
×
659
            else:
660
                rdef = error
6✔
661
                k8s_validator = K8SValidator(rdef.schema,
6✔
662
                                             format_checker=k8s_format_checker)
663
                yield from k8s_validator.iter_errors(manifest)
6✔
664

665
    def _get_manifest_rdef(self, manifest):
6✔
666
        for error in self._yield_manifest_rdef(manifest):
6!
667
            if isinstance(error, Exception):
6!
668
                raise error
×
669
            else:
670
                return error
6✔
671

672
    def _yield_manifest_rdef(self, manifest):
6✔
673
        error = None
6✔
674
        for error in K8S_MINIMAL_RESOURCE_VALIDATOR.iter_errors(manifest):
6!
675
            yield error
×
676

677
        if error:
6!
678
            return
×
679

680
        key = K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]), manifest["kind"])
6✔
681

682
        try:
6✔
683
            yield self.resource_definitions[key]
6✔
684
        except KeyError:
6✔
685
            yield ValidationError("%s is not a defined Kubernetes resource" % (key,),
×
686
                                  validator=K8S_MINIMAL_RESOURCE_VALIDATOR,
687
                                  validator_value=key,
688
                                  instance=manifest,
689
                                  schema=K8S_MINIMAL_RESOURCE_SCHEMA)
690

691
    def _add_crd(self, resource: K8SResource):
6✔
692
        for crd in K8SResourceDef.from_resource(resource):
6✔
693
            self.logger.info("Adding K8S CRD definition %s", crd.key)
6✔
694
            self.resource_definitions[crd.key] = crd
6✔
695

696
    def _populate_resource_definitions(self):
6✔
697
        k8s_def = self.resource_definitions_schema
6✔
698

699
        def k8s_resource_def_key(v: Mapping[str, Union[list, Mapping]]) -> Iterable[K8SResourceDefKey]:
6✔
700
            gvks = v.get("x-kubernetes-group-version-kind")
6✔
701
            if gvks:
6✔
702
                if isinstance(gvks, Mapping):
6✔
703
                    gvk = gvks
6✔
704
                    yield K8SResourceDefKey(gvk["group"],
6✔
705
                                            gvk["version"],
706
                                            gvk["kind"])
707
                else:
708
                    for gvk in gvks:
6✔
709
                        yield K8SResourceDefKey(gvk["group"],
6✔
710
                                                gvk["version"],
711
                                                gvk["kind"])
712

713
        paths = k8s_def["paths"]
6✔
714
        for path, actions in paths.items():
6✔
715
            path_rdk = None
6✔
716
            path_actions = []
6✔
717
            for action, action_details in actions.items():
6✔
718
                if action == "parameters":
6✔
719
                    continue
6✔
720
                rdks = list(k8s_resource_def_key(action_details))
6✔
721
                if rdks:
6✔
722
                    assert len(rdks) == 1
6✔
723
                    rdk = rdks[0]
6✔
724
                    if path_rdk:
6✔
725
                        if path_rdk != rdk:
6!
726
                            raise ValueError(f"Encountered path action x-kubernetes-group-version-kind conflict: "
×
727
                                             f"{path}: {actions}")
728
                        path_actions.append(action_details["x-kubernetes-action"])
6✔
729
                    else:
730
                        path_rdk = rdk
6✔
731

732
            if path_rdk:
6✔
733
                rdef_paths = self.resource_paths.get(path_rdk)
6✔
734
                if not rdef_paths:
6✔
735
                    rdef_paths = {}
6✔
736
                    self.resource_paths[path_rdk] = rdef_paths
6✔
737
                rdef_paths[path] = actions
6✔
738

739
        for k, schema in k8s_def["definitions"].items():
6✔
740
            # This short-circuits the resolution of the references to the top of the document
741
            schema["definitions"] = k8s_def["definitions"]
6✔
742
            for key in k8s_resource_def_key(schema):
6✔
743
                for rdef in K8SResourceDef.from_manifest(key, schema, self.resource_paths):
6✔
744
                    self.resource_definitions[key] = rdef
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc