• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24621395421

19 Apr 2026 04:58AM UTC coverage: 81.992% (+4.1%) from 77.939%
24621395421

push

github

web-flow
Add OpenAPI v3 validation with CEL rules; extract validator into k8s_schema (#104)

## Summary

Extracts all OpenAPI validation out of `k8s.py` / `k8s_api.py` into a
new `kubernator/plugins/k8s_schema/` package, and adds an OpenAPI v3
validator with client-side CEL rule evaluation as the default on
clusters ≥ 1.27.

- **`k8s_api.py` no longer does validation** — `K8SResourcePluginMixin`
holds a single `self.validator` and delegates every manifest check
(minimal envelope → rdef lookup → full schema → CEL rules) to it.
Dependency graph is one-way: `k8s_schema → k8s_api`.
- **Two validators behind a factory.** `SwaggerV2Validator` preserves
current behaviour (swagger.json + OAS31). `OpenAPIV3Validator` is new:
cluster-first `/openapi/v3` discovery with GitHub fallback, lazy
per-group fetch, transitive `$ref` closure across group documents.
- **K8s extensions enforced client-side** under v3:
`x-kubernetes-list-type` (`atomic` / `set` / `map` with
`list-map-keys`), `-preserve-unknown-fields`, `-embedded-resource`,
`-int-or-string`.
- **CEL rules evaluated client-side.** Lazy-compiled, cached by rule
text for the run. Ports of the K8s CEL libraries — `lists`, `regex`,
`format.named(...)`, `quantity`, `IP`, `CIDR` — plus a compatibility
shim for `optional.of` / `optional.none` / `hasValue` / `value` /
`orValue` so `optionalSelf` / `optionalOldSelf` work. Transition rules
fire in the apply path against the cluster's current state.
- **Selection.** `ktor.k8s.register()` gains `openapi_version` (`auto` /
`v2` / `v3`) and `openapi_source` (`auto` / `cluster` / `github`) kwargs
(also readable from context). `auto` gates on server minor ≥ 1.27 and
falls back to v2 on any v3 failure. `istio.py` is migrated to the same
factory.

New dep: `cel-python ~=0.5`.

## Test plan

- [x] 111 unit tests (v2 parity; v3 lazy fetch; api_versions without
sub-doc fetch; source fallback; every extension keyword; CEL walker +
evaluator; 7 extension libraries; factory selection / vers... (continued)

943 of 1328 branches covered (71.01%)

Branch coverage included in aggregate %.

1040 of 1082 new or added lines in 18 files covered. (96.12%)

6 existing lines in 2 files now uncovered.

3979 of 4675 relevant lines covered (85.11%)

4.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.45
/src/main/python/kubernator/plugins/k8s_api.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import json
5✔
20
import re
5✔
21
from collections import namedtuple
5✔
22
from collections.abc import Callable, Mapping, MutableMapping, Sequence
5✔
23
from enum import Enum, auto
5✔
24
from functools import partial, wraps
5✔
25
from pathlib import Path
5✔
26
from typing import Union, Optional
5✔
27

28
import yaml
5✔
29
from jsonschema.exceptions import ValidationError
5✔
30

31
from kubernator.api import load_file, FileType, load_remote_file, calling_frame_source, parse_yaml_docs
5✔
32

33

34
def api_exc_normalize_body(e):
5✔
35
    """Parse a raw ApiException body (JSON or YAML) into a Python object in-place.
36

37
    Idempotent: if e.body is already a parsed object, it is left unchanged.
38
    """
39
    if not isinstance(e.body, (str, bytes)):
5!
40
        return
×
41
    if e.headers and "content-type" in e.headers:
5!
42
        content_type = e.headers["content-type"]
5✔
43
        if content_type == "application/json" or content_type.endswith("+json"):
5!
44
            e.body = json.loads(e.body)
5✔
45
        elif (content_type in ("application/yaml", "application/x-yaml", "text/yaml",
×
46
                               "text/x-yaml") or content_type.endswith("+yaml")):
47
            e.body = yaml.safe_load(e.body)
×
48

49

50
def api_exc_format_body(e):
5✔
51
    """Format an ApiException body back to a string for human-readable display.
52

53
    Idempotent: if e.body is already a string/bytes, it is left unchanged.
54
    """
55
    if not isinstance(e.body, (str, bytes)):
5!
56
        e.body = json.dumps(e.body, indent=4)
5✔
57

58

59
def _normalize_api_exc(func):
5✔
60
    """Decorator: normalize ApiException body before propagating."""
61
    @wraps(func)
5✔
62
    def wrapper(*args, **kwargs):
5✔
63
        from kubernetes.client import ApiException
5✔
64
        try:
5✔
65
            return func(*args, **kwargs)
5✔
66
        except ApiException as e:
5✔
67
            api_exc_normalize_body(e)
5✔
68
            raise
5✔
69
    return wrapper
5✔
70

71

72
K8S_WARNING_HEADER = re.compile(r'(?:,\s*)?(\d{3})\s+(\S+)\s+"(.+?)(?<!\\)"(?:\s+\"(.+?)(?<!\\)\")?\s*')
5✔
73
UPPER_FOLLOWED_BY_LOWER_RE = re.compile(r"(.)([A-Z][a-z]+)")
5✔
74
LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE = re.compile(r"([a-z0-9])([A-Z])")
5✔
75

76
CLUSTER_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}([^/]+)$")
5✔
77
NAMESPACED_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}namespaces/[^/]+/([^/]+)$")
5✔
78

79

80
class K8SResourcePatchType(Enum):
5✔
81
    JSON_PATCH = auto()
5✔
82
    SERVER_SIDE_PATCH = auto()
5✔
83

84

85
class K8SPropagationPolicy(Enum):
5✔
86
    BACKGROUND = ("Background",)
5✔
87
    FOREGROUND = ("Foreground",)
5✔
88
    ORPHAN = ("Orphan",)
5✔
89

90
    def __init__(self, policy):
5✔
91
        self.policy = policy
5✔
92

93

94
def to_group_and_version(api_version):
5✔
95
    group, _, version = api_version.partition("/")
5✔
96
    if not version:
5✔
97
        version = group
5✔
98
        group = ""
5✔
99
    return group, version
5✔
100

101

102
def to_k8s_resource_def_key(manifest):
5✔
103
    return K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]),
×
104
                             manifest["kind"])
105

106

107
class K8SResourceDefKey(namedtuple("K8SResourceDefKey", ["group", "version", "kind"])):
5✔
108
    __slots__ = ()
5✔
109

110
    def __str__(self):
5✔
111
        return f"{self.group}{'/' if self.group else '/'}{self.version}/{self.kind}"
5✔
112

113

114
class K8SResourceDef:
5✔
115
    def __init__(self, key, singular, plural, namespaced, custom, schema):
5✔
116
        self.key = key
5✔
117
        self.singular = singular
5✔
118
        self.plural = plural
5✔
119
        self.namespaced = namespaced
5✔
120
        self.custom = custom
5✔
121
        self.schema = schema
5✔
122

123
        self._api_get = None
5✔
124
        self._api_create = None
5✔
125
        self._api_patch = None
5✔
126
        self._api_delete = None
5✔
127
        self._api_list = None
5✔
128

129
    @property
5✔
130
    def group(self) -> str:
5✔
131
        return self.key.group
5✔
132

133
    @property
5✔
134
    def version(self) -> str:
5✔
135
        return self.key.version
5✔
136

137
    @property
5✔
138
    def kind(self) -> str:
5✔
139
        return self.key.kind
5✔
140

141
    @property
5✔
142
    def has_api(self) -> bool:
5✔
143
        return self.custom or self.plural
5✔
144

145
    @property
5✔
146
    def get(self):
5✔
147
        return self._api_get
5✔
148

149
    @property
5✔
150
    def create(self):
5✔
151
        return self._api_create
5✔
152

153
    @property
5✔
154
    def patch(self):
5✔
155
        return self._api_patch
5✔
156

157
    @property
5✔
158
    def delete(self):
5✔
159
        return self._api_delete
5✔
160

161
    @property
5✔
162
    def list(self):
5✔
163
        return self._api_list
5✔
164

165
    def __eq__(self, o: object) -> bool:
5✔
166
        if not isinstance(o, K8SResourceDef):
×
167
            return False
×
168

169
        return (self.key == o.key and
×
170
                self.singular == o.singular and
171
                self.plural == o.plural and
172
                self.namespaced == o.namespaced and
173
                self.custom == o.custom)
174

175
    def __hash__(self) -> int:
5✔
176
        return self.key.__hash__()
×
177

178
    def __str__(self):
5✔
179
        return f"{self.key=}, {self.singular=}, {self.plural=}, {self.namespaced=}, {self.custom=}"
×
180

181
    @classmethod
5✔
182
    def from_manifest(cls, key: K8SResourceDefKey,
5✔
183
                      schema,
184
                      paths: Mapping[K8SResourceDefKey, Mapping[str, Mapping]]):
185
        singular = key.kind.lower()
5✔
186

187
        plural = None
5✔
188
        namespaced = False
5✔
189

190
        if singular == "namespace":
5✔
191
            plural = "namespaces"
5✔
192
        else:
193
            for path in paths.get(key, ()):
5✔
194
                if m := NAMESPACED_RESOURCE_PATH.fullmatch(path):
5✔
195
                    plural = m[1]
5✔
196
                    namespaced = True
5✔
197
                    break
5✔
198
                elif m := CLUSTER_RESOURCE_PATH.fullmatch(path):
5✔
199
                    plural = m[1]
5✔
200

201
        yield K8SResourceDef(key, singular, plural, namespaced, False, schema)
5✔
202

203
    @classmethod
5✔
204
    def from_resource(cls, resource: "K8SResource"):
5✔
205
        manifest = resource.manifest
5✔
206
        spec = manifest["spec"]
5✔
207
        group = spec["group"]
5✔
208
        names = spec["names"]
5✔
209
        kind = names["kind"]
5✔
210
        singular = names.get("singular", names["kind"].lower())
5✔
211
        plural = names["plural"]
5✔
212
        namespaced = spec["scope"] == "Namespaced"
5✔
213

214
        for version_spec in spec["versions"]:
5✔
215
            version = version_spec["name"]
5✔
216
            if resource.version == "v1":
5!
217
                schema = version_spec["schema"]["openAPIV3Schema"]
5✔
218
            else:
219
                schema = spec["validation"]["openAPIV3Schema"]
×
220
            yield K8SResourceDef(K8SResourceDefKey(group, version, kind), singular, plural, namespaced, True, schema)
5✔
221

222
    def populate_api(self, k8s_client_module, k8s_client):
5✔
223
        if not self.has_api:
5!
224
            raise RuntimeError(f"{self} has no API")
×
225

226
        if self._api_get:
5✔
227
            return
5✔
228

229
        group = self.group or "core"
5✔
230
        version = self.version
5✔
231
        kind = self.kind
5✔
232

233
        if self.custom:
5✔
234
            k8s_api = k8s_client_module.CustomObjectsApi(k8s_client)
5✔
235

236
            kwargs = {"group": group,
5✔
237
                      "version": version,
238
                      "plural": self.plural}
239
            if self.namespaced:
5!
240
                self._api_get = partial(k8s_api.get_namespaced_custom_object, **kwargs)
5✔
241
                self._api_patch = partial(k8s_api.patch_namespaced_custom_object, **kwargs)
5✔
242
                self._api_create = partial(k8s_api.create_namespaced_custom_object, **kwargs)
5✔
243
                self._api_delete = partial(k8s_api.delete_namespaced_custom_object, **kwargs)
5✔
244
                self._api_list = partial(k8s_api.list_namespaced_custom_object, **kwargs)
5✔
245
            else:
246
                self._api_get = partial(k8s_api.get_cluster_custom_object, **kwargs)
×
247
                self._api_patch = partial(k8s_api.patch_cluster_custom_object, **kwargs)
×
248
                self._api_create = partial(k8s_api.create_cluster_custom_object, **kwargs)
×
249
                self._api_delete = partial(k8s_api.delete_cluster_custom_object, **kwargs)
×
250
                self._api_list = partial(k8s_api.list_cluster_custom_object, **kwargs)
×
251
        else:
252
            # Take care for the case e.g. api_type is "apiextensions.k8s.io"
253
            # Only replace the last instance
254
            group = "".join(group.rsplit(".k8s.io", 1))
5✔
255

256
            # convert group name from DNS subdomain format to
257
            # python class name convention
258
            group = "".join(word.capitalize() for word in group.split('.'))
5✔
259
            fcn_to_call = f"{group}{version.capitalize()}Api"
5✔
260
            k8s_api = getattr(k8s_client_module, fcn_to_call)(k8s_client)
5✔
261

262
            # Replace CamelCased action_type into snake_case
263
            kind = UPPER_FOLLOWED_BY_LOWER_RE.sub(r"\1_\2", kind)
5✔
264
            kind = LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE.sub(r"\1_\2", kind).lower()
5✔
265

266
            if self.namespaced:
5✔
267
                self._api_get = getattr(k8s_api, f"read_namespaced_{kind}")
5✔
268
                self._api_patch = getattr(k8s_api, f"patch_namespaced_{kind}")
5✔
269
                self._api_create = getattr(k8s_api, f"create_namespaced_{kind}")
5✔
270
                self._api_delete = getattr(k8s_api, f"delete_namespaced_{kind}")
5✔
271
                self._api_list = getattr(k8s_api, f"list_namespaced_{kind}")
5✔
272
            else:
273
                self._api_get = getattr(k8s_api, f"read_{kind}")
5✔
274
                self._api_patch = getattr(k8s_api, f"patch_{kind}")
5✔
275
                self._api_create = getattr(k8s_api, f"create_{kind}")
5✔
276
                self._api_delete = getattr(k8s_api, f"delete_{kind}")
5✔
277
                self._api_list = getattr(k8s_api, f"list_{kind}")
5✔
278

279

280
class K8SResourceKey(namedtuple("K8SResourceKey", ["group", "kind", "name", "namespace"])):
5✔
281
    __slots__ = ()
5✔
282

283
    def __str__(self):
5✔
284
        return (f"{self.group}{'/' if self.group else 'v1/'}{self.kind}"
×
285
                f"/{self.name}{f'.{self.namespace}' if self.namespace else ''}")
286

287

288
class K8SResource:
5✔
289
    _k8s_client_version = None
5✔
290
    _k8s_field_validation = None
5✔
291
    _k8s_field_validation_patched = None
5✔
292
    _logger = None
5✔
293
    _api_warnings = None
5✔
294

295
    def __init__(self, manifest: dict, rdef: K8SResourceDef, source: Union[str, Path] = None):
5✔
296
        self.key = self.get_manifest_key(manifest)
5✔
297

298
        self.manifest = manifest
5✔
299
        self.rdef = rdef
5✔
300
        self.source = source
5✔
301

302
    @property
5✔
303
    def group(self) -> str:
5✔
304
        return self.key.group
5✔
305

306
    @property
5✔
307
    def version(self) -> str:
5✔
308
        return self.rdef.version
5✔
309

310
    @property
5✔
311
    def kind(self) -> str:
5✔
312
        return self.key.kind
5✔
313

314
    @property
5✔
315
    def name(self) -> str:
5✔
316
        return self.key.name
5✔
317

318
    @name.setter
5✔
319
    def name(self, value):
5✔
320
        self.manifest["metadata"]["name"] = value
×
321
        self.key = self.get_manifest_key(self.manifest)
×
322

323
    @property
5✔
324
    def namespace(self) -> Optional[str]:
5✔
325
        return self.key.namespace
5✔
326

327
    @namespace.setter
5✔
328
    def namespace(self, value):
5✔
329
        self.manifest["metadata"]["namespace"] = value
×
330
        self.key = self.get_manifest_key(self.manifest)
×
331

332
    @property
5✔
333
    def api_version(self) -> str:
5✔
334
        return self.manifest["apiVersion"]
5✔
335

336
    @property
5✔
337
    def schema(self) -> dict:
5✔
338
        return self.rdef.schema
×
339

340
    @property
5✔
341
    def is_crd(self):
5✔
342
        return self.group == "apiextensions.k8s.io" and self.kind == "CustomResourceDefinition"
5✔
343

344
    def __str__(self):
5✔
345
        return f"{self.api_version}/{self.kind}/{self.name}{'.' + self.namespace if self.namespace else ''}"
5✔
346

347
    @_normalize_api_exc
5✔
348
    def get(self):
5✔
349
        rdef = self.rdef
5✔
350
        kwargs = {"name": self.name,
5✔
351
                  "_preload_content": False}
352
        if rdef.namespaced:
5✔
353
            kwargs["namespace"] = self.namespace
5✔
354
        return json.loads(self.rdef.get(**kwargs).data)
5✔
355

356
    @_normalize_api_exc
5✔
357
    def create(self, dry_run=True):
5✔
358
        rdef = self.rdef
5✔
359
        kwargs = {"body": self.manifest,
5✔
360
                  "_preload_content": False,
361
                  "field_manager": "kubernator",
362
                  }
363

364
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
365
        if self._k8s_field_validation_patched or not self.rdef.custom:
5!
366
            kwargs["field_validation"] = self._k8s_field_validation
5✔
367
        if rdef.namespaced:
5✔
368
            kwargs["namespace"] = self.namespace
5✔
369
        if dry_run:
5✔
370
            kwargs["dry_run"] = "All"
5✔
371
        resp = rdef.create(**kwargs)
5✔
372
        self._process_response_headers(resp)
5✔
373
        return json.loads(resp.data)
5✔
374

375
    @_normalize_api_exc
5✔
376
    def patch(self, json_patch, *, patch_type: K8SResourcePatchType, force=False, dry_run=True):
5✔
377
        rdef = self.rdef
5✔
378
        kwargs = {"name": self.name,
5✔
379
                  "body": json_patch,
380
                  "_preload_content": False,
381
                  "field_manager": "kubernator",
382
                  }
383

384
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
385
        if self._k8s_field_validation_patched or not self.rdef.custom:
5!
386
            kwargs["field_validation"] = self._k8s_field_validation
5✔
387
        if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
5✔
388
            kwargs["force"] = force
5✔
389
        if rdef.namespaced:
5✔
390
            kwargs["namespace"] = self.namespace
5✔
391
        if dry_run:
5✔
392
            kwargs["dry_run"] = "All"
5✔
393

394
        def select_header_content_type_patch(content_types):
5✔
395
            if patch_type == K8SResourcePatchType.JSON_PATCH:
5✔
396
                return "application/json-patch+json"
5✔
397
            if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
5!
398
                return "application/apply-patch+yaml"
5✔
399
            raise NotImplementedError
×
400

401
        if isinstance(rdef.patch, partial):
5!
402
            api_client = rdef.patch.func.__self__.api_client
×
403
        else:
404
            api_client = rdef.patch.__self__.api_client
5✔
405

406
        old_func = api_client.select_header_content_type
5✔
407
        try:
5✔
408
            api_client.select_header_content_type = select_header_content_type_patch
5✔
409
            resp = rdef.patch(**kwargs)
5✔
410
            self._process_response_headers(resp)
5✔
411
            return json.loads(resp.data)
5✔
412
        finally:
413
            api_client.select_header_content_type = old_func
5✔
414

415
    @_normalize_api_exc
5✔
416
    def delete(self, *, dry_run=True, propagation_policy=K8SPropagationPolicy.BACKGROUND, wait=True):
5✔
417
        from kubernetes.client import ApiException
5✔
418
        rdef = self.rdef
5✔
419
        kwargs = {"name": self.name,
5✔
420
                  "_preload_content": False,
421
                  "propagation_policy": propagation_policy.policy
422
                  }
423
        if rdef.namespaced:
5!
424
            kwargs["namespace"] = self.namespace
5✔
425
        if dry_run:
5✔
426
            kwargs["dry_run"] = "All"
5✔
427

428
        result = json.loads(rdef.delete(**kwargs).data)
5✔
429

430
        if wait and not dry_run:
5!
431
            # Wait for the resource to actually disappear by watching for the
432
            # DELETED event. If the resource is already gone before the watch
433
            # opens (race), the watch produces no events; loop with a short
434
            # server-side timeout and re-check existence via get() to break out.
435
            while True:
5✔
436
                for event in self.watch(timeout_seconds=10):
5✔
437
                    if event["type"] == "DELETED":
4✔
438
                        return result
4✔
UNCOV
439
                try:
1✔
UNCOV
440
                    self.get()
1✔
UNCOV
441
                except ApiException as e:
1✔
UNCOV
442
                    if e.status == 404:
1!
UNCOV
443
                        return result
1✔
444
                    raise
×
445

446
        return result
×
447

448
    def watch(self, *, timeout_seconds=None):
5✔
449
        from kubernetes import watch as k8s_watch
5✔
450
        rdef = self.rdef
5✔
451
        kwargs = {"field_selector": f"metadata.name={self.name}"}
5✔
452
        if rdef.namespaced:
5!
453
            kwargs["namespace"] = self.namespace
5✔
454
        if timeout_seconds is not None:
5!
455
            kwargs["timeout_seconds"] = timeout_seconds
5✔
456
        return k8s_watch.Watch().stream(rdef.list, **kwargs)
5✔
457

458
    @staticmethod
5✔
459
    def get_manifest_key(manifest):
5✔
460
        return K8SResourceKey(to_group_and_version(manifest["apiVersion"])[0],
5✔
461
                              manifest["kind"],
462
                              manifest["metadata"]["name"],
463
                              manifest["metadata"].get("namespace"))
464

465
    @staticmethod
5✔
466
    def get_manifest_description(manifest: dict, source=None):
5✔
467
        api_version = manifest.get("apiVersion")
5✔
468
        kind = manifest.get("kind")
5✔
469
        metadata = manifest.get("metadata")
5✔
470
        name = None
5✔
471
        namespace = None
5✔
472
        if metadata:
5!
473
            name = metadata.get("name")
5✔
474
            namespace = metadata.get("namespace")
5✔
475
        return (f"{api_version or 'unknown'}/{kind or '<unknown>'}/"
5✔
476
                f"{name or '<unknown>'}{'.' + namespace if namespace else ''}")
477

478
    def __eq__(self, other):
5✔
479
        if not isinstance(other, K8SResource):
×
480
            return False
×
481
        return self.key == other.key and self.manifest == other.manifest
×
482

483
    def _process_response_headers(self, resp):
5✔
484
        headers = resp.headers
5✔
485
        warn_headers = headers.get("Warning")
5✔
486
        if warn_headers:
5✔
487
            for warn in K8S_WARNING_HEADER.findall(warn_headers):
5✔
488
                code, _, msg, _ = warn
5✔
489
                code = int(code)
5✔
490
                msg = msg.encode("utf-8").decode("unicode_escape")
5✔
491
                if code == 299:
5!
492
                    self._api_warnings(self, msg)
5✔
493
                else:
494
                    self._logger.warning("Unknown API warning received for resource %s from %s: code %d: %s",
×
495
                                         self, self.source, code, msg)
496

497

498
class K8SResourcePluginMixin:
5✔
499
    def __init__(self):
5✔
500
        self.validator = None
5✔
501
        self.resources: MutableMapping[K8SResourceKey, K8SResource] = {}
5✔
502

503
    def _require_validator(self):
5✔
504
        if self.validator is None:
5!
NEW
505
            raise RuntimeError(
×
506
                "K8S validator not initialised; handle_start() must run first")
507
        return self.validator
5✔
508

509
    @property
5✔
510
    def resource_definitions(self) -> MutableMapping[K8SResourceDefKey, K8SResourceDef]:
5✔
511
        return self._require_validator().resource_definitions
5✔
512

513
    @property
5✔
514
    def resource_paths(self) -> MutableMapping[K8SResourceDefKey, MutableMapping[str, dict]]:
5✔
NEW
515
        return self._require_validator().resource_paths
×
516

517
    def add_resources(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
5✔
518
        if not source:
5✔
519
            source = calling_frame_source()
5✔
520

521
        if isinstance(manifests, str):
5✔
522
            manifests = list(parse_yaml_docs(manifests, source))
5✔
523

524
        if isinstance(manifests, (Mapping, dict)):
5!
525
            return self.add_resource(manifests, source)
×
526
        else:
527
            return [self.add_resource(m, source) for m in manifests if m]
5✔
528

529
    def add_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
530
        if not source:
5!
531
            source = calling_frame_source()
×
532
        resource = self._create_resource(manifest, source)
5✔
533

534
        try:
5✔
535
            trans_resource = self._transform_resource(list(self.resources.values()), resource)
5✔
536
        except Exception as e:
×
537
            self.logger.error("An error occurred running transformers on %s", resource, exc_info=e)
×
538
            raise
×
539

540
        errors = list(self._validate_resource(trans_resource.manifest, source))
5✔
541
        if errors:
5!
542
            for error in errors:
×
543
                if source:
×
544
                    self.logger.error("Error detected in re-transformed K8S resource %s generated through %s",
×
545
                                      trans_resource, source, exc_info=error)
546
            raise errors[0]
×
547

548
        return self._add_resource(trans_resource, source)
5✔
549

550
    def add_crds(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
5✔
551
        if not source:
×
552
            source = calling_frame_source()
×
553

554
        if isinstance(manifests, str):
×
555
            manifests = list(parse_yaml_docs(manifests, source))
×
556

557
        if isinstance(manifests, (Mapping, dict)):
×
558
            return self.add_crd(manifests, source)
×
559
        else:
560
            return [self.add_crd(m, source) for m in manifests if m]
×
561

562
    def add_crd(self, manifest: dict, source: Union[str, Path] = None):
5✔
563
        if not source:
5!
564
            source = calling_frame_source()
×
565
        resource = self._create_resource(manifest, source)
5✔
566
        if not resource.is_crd:
5!
567
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
568
            raise ValueError(f"K8S manifest {resource_description} from {source} is not a CRD")
×
569

570
        self._add_crd(resource)
5✔
571
        return resource
5✔
572

573
    def create_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
574
        """Create K8S resource without adding it"""
575
        if not source:
×
576
            source = calling_frame_source()
×
577

578
        return self._create_resource(manifest, source)
×
579

580
    def add_local_resources(self, path: Path, file_type: FileType, source: str = None):
5✔
581
        manifests = load_file(self.logger, path, file_type)
×
582

583
        return [self.add_resource(m, source or path) for m in manifests if m]
×
584

585
    def add_remote_resources(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
5✔
586
                             source: str = None):
587
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
×
588

589
        return [self.add_resource(m, source or url) for m in manifests if m]
×
590

591
    def add_local_crds(self, path: Path, file_type: FileType, source: str = None):
5✔
592
        manifests = load_file(self.logger, path, file_type)
5✔
593

594
        return [self.add_crd(m, source or path) for m in manifests if m]
5✔
595

596
    def add_remote_crds(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
5✔
597
                        source: str = None):
598
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
5✔
599

600
        return [self.add_crd(m, source or url) for m in manifests if m]
5✔
601

602
    def get_api_versions(self):
5✔
603
        return self.validator.api_versions()
5✔
604

605
    def _create_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
606
        resource_description = K8SResource.get_manifest_description(manifest, source)
5✔
607

608
        new_manifest = self._patch_manifest(manifest, resource_description)
5✔
609
        if new_manifest != manifest:
5!
610
            manifest = new_manifest
×
611
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
612

613
        self.logger.debug("Validating K8S manifest for %s", resource_description)
5✔
614
        errors = list(self._validate_resource(manifest, source))
5✔
615
        if errors:
5✔
616
            for error in errors:
5✔
617
                self.logger.error("Error detected in K8S manifest %s from %s: \n%s",
5✔
618
                                  resource_description, source or "<unknown>", yaml.safe_dump(manifest, None),
619
                                  exc_info=error)
620
            raise errors[0]
5✔
621

622
        rdef = self._get_manifest_rdef(manifest)
5✔
623
        return K8SResource(manifest, rdef, source)
5✔
624

625
    def _add_resource(self, resource: K8SResource, source):
5✔
626
        if resource.key in self.resources:
5!
627
            existing_resource = self.resources[resource.key]
×
628
            if resource != existing_resource:
×
629
                raise ValidationError("resource %s from %s already exists and was added from %s" %
×
630
                                      (resource.key, resource.source, existing_resource.source))
631
            self.logger.trace("K8S resource for %s from %s is already present and is identical", resource, source)
×
632
            return existing_resource
×
633

634
        self.logger.info("Adding K8S resource for %s from %s", resource, source)
5✔
635
        self.resources[resource.key] = resource
5✔
636

637
        if resource.is_crd:
5✔
638
            self._add_crd(resource)
5✔
639

640
        return resource
5✔
641

642
    def _patch_manifest(self,
5✔
643
                        manifest: dict,
644
                        resource_description: str):
645
        return manifest
5✔
646

647
    def _transform_resource(self,
5✔
648
                            resources: Sequence[K8SResource],
649
                            resource: K8SResource) -> K8SResource:
650
        return resource
5✔
651

652
    def _filter_resources(self, func: Callable[[K8SResource], bool]):
5✔
653
        yield from filter(func, self.resources.values())
×
654

655
    def _validate_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
656
        yield from self._require_validator().iter_manifest_errors(manifest)
5✔
657

658
    def _get_manifest_rdef(self, manifest):
5✔
659
        return self._require_validator().get_manifest_rdef(manifest)
5✔
660

661
    def _add_crd(self, resource: K8SResource):
5✔
662
        for crd in K8SResourceDef.from_resource(resource):
5✔
663
            self.logger.info("Adding K8S CRD definition %s", crd.key)
5✔
664
            self.resource_definitions[crd.key] = crd
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc