• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24580658204

17 Apr 2026 06:29PM UTC coverage: 77.939% (+1.0%) from 76.951%
24580658204

push

github

web-flow
Refactor k8s apply: SSA test-ops, 409 retry, watch-based delete, imperative API (#102)

## Summary

- Harden the K8s apply path with SSA-derived JSON-patch `test` ops for
`/metadata/uid` and `/metadata/resourceVersion`, and wrap the SSA patch
branch in a 409-retry loop that recomputes the patch on conflict.
- Extend `K8SResource` with `list()`, `watch()`, `delete(wait=True)`
that uses watch + `get()` recheck, and add a `resource_generator()`
extension seam on `KubernetesPlugin`.
- Add `ktor.k8s.resource(manifest)` — a fully-wired `K8SResource`
factory for imperative CRUD from `.kubernator.py` scripts, bypassing the
declarative apply lifecycle.
- Thread the applied manifest as a 4th return from `_apply_resource` and
its inner patch/create/delete callables for external consumers.
- Move dump-file open/close from `app.py` into
`KubernetesPlugin.handle_apply`; fix dump-mode `patch_func` to seed
uid/RV on a copy of the local manifest by extracting them from the
patch's own test ops (via `jsonpointer`), so the simulated post-patch
manifest can be returned honestly.
- Rewrite `delete_create` and `resource_version_merge` integration tests
as single-phase using the new imperative API, replacing `kubectl`-based
phase-1 setup and the `TEST_PHASE` env re-invocation.
- Add unit test for 409 retry and integration tests for `watch()` and
`resource_generator`. Port `resource_version_merge` from minikube to
kind.

## Test plan

- [x] `pyb -vX run_unit_tests` (32 tests, all green — includes new
`k8s_apply_retry_tests`)
- [x] `pyb -vX run_integration_tests -P
integrationtest_file_glob=delete_create_tests.py` (single-phase, kind,
immutable-field delete+recreate path)
- [x] `pyb -vX run_integration_tests -P
integrationtest_file_glob=resource_version_merge_tests.py`
(single-phase, kind, dump mode patch verification)
- [ ] Full `pyb -vX run_integration_tests` suite via CI across the
Python 3.10–3.14 matrix

614 of 976 branches covered (62.91%)

Branch coverage included in aggregate %.

137 of 163 new or added lines in 4 files covered. (84.05%)

5 existing lines in 3 files now uncovered.

3039 of 3711 relevant lines covered (81.89%)

4.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.94
/src/main/python/kubernator/plugins/k8s_api.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import base64
5✔
20
import json
5✔
21
import re
5✔
22
from collections import namedtuple
5✔
23
from collections.abc import Callable, Mapping, MutableMapping, Sequence, Iterable
5✔
24
from enum import Enum, auto
5✔
25
from functools import partial, wraps
5✔
26
from pathlib import Path
5✔
27
from typing import Union, Optional
5✔
28

29
import yaml
5✔
30
from jsonschema._format import FormatChecker
5✔
31
from jsonschema._keywords import required
5✔
32
from jsonschema.exceptions import ValidationError
5✔
33
from jsonschema.validators import extend, Draft7Validator
5✔
34
from openapi_schema_validator import OAS31Validator
5✔
35

36
from kubernator.api import load_file, FileType, load_remote_file, calling_frame_source, parse_yaml_docs
5✔
37

38

39
def api_exc_normalize_body(e):
5✔
40
    """Parse a raw ApiException body (JSON or YAML) into a Python object in-place.
41

42
    Idempotent: if e.body is already a parsed object, it is left unchanged.
43
    """
44
    if not isinstance(e.body, (str, bytes)):
5!
NEW
45
        return
×
46
    if e.headers and "content-type" in e.headers:
5!
47
        content_type = e.headers["content-type"]
5✔
48
        if content_type == "application/json" or content_type.endswith("+json"):
5!
49
            e.body = json.loads(e.body)
5✔
NEW
50
        elif (content_type in ("application/yaml", "application/x-yaml", "text/yaml",
×
51
                               "text/x-yaml") or content_type.endswith("+yaml")):
NEW
52
            e.body = yaml.safe_load(e.body)
×
53

54

55
def api_exc_format_body(e):
5✔
56
    """Format an ApiException body back to a string for human-readable display.
57

58
    Idempotent: if e.body is already a string/bytes, it is left unchanged.
59
    """
60
    if not isinstance(e.body, (str, bytes)):
5!
61
        e.body = json.dumps(e.body, indent=4)
5✔
62

63

64
def _normalize_api_exc(func):
5✔
65
    """Decorator: normalize ApiException body before propagating."""
66
    @wraps(func)
5✔
67
    def wrapper(*args, **kwargs):
5✔
68
        from kubernetes.client import ApiException
5✔
69
        try:
5✔
70
            return func(*args, **kwargs)
5✔
71
        except ApiException as e:
5✔
72
            api_exc_normalize_body(e)
5✔
73
            raise
5✔
74
    return wrapper
5✔
75

76

77
K8S_WARNING_HEADER = re.compile(r'(?:,\s*)?(\d{3})\s+(\S+)\s+"(.+?)(?<!\\)"(?:\s+\"(.+?)(?<!\\)\")?\s*')
5✔
78
UPPER_FOLLOWED_BY_LOWER_RE = re.compile(r"(.)([A-Z][a-z]+)")
5✔
79
LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE = re.compile(r"([a-z0-9])([A-Z])")
5✔
80

81
K8S_MINIMAL_RESOURCE_SCHEMA = {
5✔
82
    "properties": {
83
        "apiVersion": {
84
            "type": "string"
85
        },
86
        "kind": {
87
            "type": "string"
88
        },
89
        "metadata": {
90
            "type": "object",
91
            "properties": {
92
                "name": {
93
                    "type": "string"
94
                },
95
                "namespace": {
96
                    "type": "string"
97
                }
98
            },
99
            "required": ["name"]
100
        }
101
    },
102
    "type": "object",
103
    "required": ["apiVersion", "kind"]
104
}
105
K8S_MINIMAL_RESOURCE_VALIDATOR = Draft7Validator(K8S_MINIMAL_RESOURCE_SCHEMA)
5✔
106

107
CLUSTER_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}([^/]+)$")
5✔
108
NAMESPACED_RESOURCE_PATH = re.compile(r"^/apis?/(?:[^/]+/){1,2}namespaces/[^/]+/([^/]+)$")
5✔
109

110

111
class K8SResourcePatchType(Enum):
5✔
112
    JSON_PATCH = auto()
5✔
113
    SERVER_SIDE_PATCH = auto()
5✔
114

115

116
class K8SPropagationPolicy(Enum):
5✔
117
    BACKGROUND = ("Background",)
5✔
118
    FOREGROUND = ("Foreground",)
5✔
119
    ORPHAN = ("Orphan",)
5✔
120

121
    def __init__(self, policy):
5✔
122
        self.policy = policy
5✔
123

124

125
def is_integer(instance):
5✔
126
    # bool inherits from int, so ensure bools aren't reported as ints
127
    if isinstance(instance, bool):
5!
128
        return False
×
129
    return isinstance(instance, int)
5✔
130

131

132
def is_string(instance):
5✔
133
    return isinstance(instance, str)
5✔
134

135

136
def type_validator(validator, data_type, instance, schema):
5✔
137
    if instance is None:
5!
138
        return
×
139

140
    if data_type == "string" and schema.get("format") == "int-or-string":
5✔
141
        if not (is_string(instance) or is_integer(instance)):
5!
142
            yield ValidationError("%r is not of type %s" % (instance, "int-or-string"))
×
143
    elif not validator.is_type(instance, data_type):
5!
144
        yield ValidationError("%r is not of type %s" % (instance, data_type))
×
145

146

147
K8SValidator = extend(OAS31Validator, validators={
5✔
148
    "type": type_validator,
149
    "required": required
150
})
151

152
k8s_format_checker = FormatChecker()
5✔
153

154

155
@k8s_format_checker.checks("int32")
5✔
156
def check_int32(value):
5✔
157
    return value is not None and (-2147483648 < value < 2147483647)
5✔
158

159

160
@k8s_format_checker.checks("int64")
5✔
161
def check_int64(value):
5✔
162
    return value is not None and (-9223372036854775808 < value < 9223372036854775807)
5✔
163

164

165
@k8s_format_checker.checks("float")
5✔
166
def check_float(value):
5✔
167
    return value is not None and (-3.4E+38 < value < +3.4E+38)
×
168

169

170
@k8s_format_checker.checks("double")
5✔
171
def check_double(value):
5✔
172
    return value is not None and (-1.7E+308 < value < +1.7E+308)
5✔
173

174

175
@k8s_format_checker.checks("byte", ValueError)
5✔
176
def check_byte(value):
5✔
177
    if value is None:
5!
178
        return False
×
179
    base64.b64decode(value, validate=True)
5✔
180
    return True
5✔
181

182

183
@k8s_format_checker.checks("int-or-string")
5✔
184
def check_int_or_string(value):
5✔
185
    return check_int32(value) if is_integer(value) else is_string(value)
5✔
186

187

188
def to_group_and_version(api_version):
5✔
189
    group, _, version = api_version.partition("/")
5✔
190
    if not version:
5✔
191
        version = group
5✔
192
        group = ""
5✔
193
    return group, version
5✔
194

195

196
def to_k8s_resource_def_key(manifest):
5✔
197
    return K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]),
×
198
                             manifest["kind"])
199

200

201
class K8SResourceDefKey(namedtuple("K8SResourceDefKey", ["group", "version", "kind"])):
5✔
202
    __slots__ = ()
5✔
203

204
    def __str__(self):
5✔
205
        return f"{self.group}{'/' if self.group else '/'}{self.version}/{self.kind}"
5✔
206

207

208
class K8SResourceDef:
5✔
209
    def __init__(self, key, singular, plural, namespaced, custom, schema):
5✔
210
        self.key = key
5✔
211
        self.singular = singular
5✔
212
        self.plural = plural
5✔
213
        self.namespaced = namespaced
5✔
214
        self.custom = custom
5✔
215
        self.schema = schema
5✔
216

217
        self._api_get = None
5✔
218
        self._api_create = None
5✔
219
        self._api_patch = None
5✔
220
        self._api_delete = None
5✔
221
        self._api_list = None
5✔
222

223
    @property
5✔
224
    def group(self) -> str:
5✔
225
        return self.key.group
5✔
226

227
    @property
5✔
228
    def version(self) -> str:
5✔
229
        return self.key.version
5✔
230

231
    @property
5✔
232
    def kind(self) -> str:
5✔
233
        return self.key.kind
5✔
234

235
    @property
5✔
236
    def has_api(self) -> bool:
5✔
237
        return self.custom or self.plural
5✔
238

239
    @property
5✔
240
    def get(self):
5✔
241
        return self._api_get
5✔
242

243
    @property
5✔
244
    def create(self):
5✔
245
        return self._api_create
5✔
246

247
    @property
5✔
248
    def patch(self):
5✔
249
        return self._api_patch
5✔
250

251
    @property
5✔
252
    def delete(self):
5✔
253
        return self._api_delete
5✔
254

255
    @property
5✔
256
    def list(self):
5✔
257
        return self._api_list
5✔
258

259
    def __eq__(self, o: object) -> bool:
5✔
260
        if not isinstance(o, K8SResourceDef):
×
261
            return False
×
262

263
        return (self.key == o.key and
×
264
                self.singular == o.singular and
265
                self.plural == o.plural and
266
                self.namespaced == o.namespaced and
267
                self.custom == o.custom)
268

269
    def __hash__(self) -> int:
5✔
270
        return self.key.__hash__()
×
271

272
    def __str__(self):
5✔
273
        return f"{self.key=}, {self.singular=}, {self.plural=}, {self.namespaced=}, {self.custom=}"
×
274

275
    @classmethod
5✔
276
    def from_manifest(cls, key: K8SResourceDefKey,
5✔
277
                      schema,
278
                      paths: Mapping[K8SResourceDefKey, Mapping[str, Mapping]]):
279
        singular = key.kind.lower()
5✔
280

281
        plural = None
5✔
282
        namespaced = False
5✔
283

284
        if singular == "namespace":
5✔
285
            plural = "namespaces"
5✔
286
        else:
287
            for path in paths.get(key, ()):
5✔
288
                if m := NAMESPACED_RESOURCE_PATH.fullmatch(path):
5✔
289
                    plural = m[1]
5✔
290
                    namespaced = True
5✔
291
                    break
5✔
292
                elif m := CLUSTER_RESOURCE_PATH.fullmatch(path):
5✔
293
                    plural = m[1]
5✔
294

295
        yield K8SResourceDef(key, singular, plural, namespaced, False, schema)
5✔
296

297
    @classmethod
5✔
298
    def from_resource(cls, resource: "K8SResource"):
5✔
299
        manifest = resource.manifest
5✔
300
        spec = manifest["spec"]
5✔
301
        group = spec["group"]
5✔
302
        names = spec["names"]
5✔
303
        kind = names["kind"]
5✔
304
        singular = names.get("singular", names["kind"].lower())
5✔
305
        plural = names["plural"]
5✔
306
        namespaced = spec["scope"] == "Namespaced"
5✔
307

308
        for version_spec in spec["versions"]:
5✔
309
            version = version_spec["name"]
5✔
310
            if resource.version == "v1":
5!
311
                schema = version_spec["schema"]["openAPIV3Schema"]
5✔
312
            else:
313
                schema = spec["validation"]["openAPIV3Schema"]
×
314
            yield K8SResourceDef(K8SResourceDefKey(group, version, kind), singular, plural, namespaced, True, schema)
5✔
315

316
    def populate_api(self, k8s_client_module, k8s_client):
5✔
317
        if not self.has_api:
5!
318
            raise RuntimeError(f"{self} has no API")
×
319

320
        if self._api_get:
5✔
321
            return
5✔
322

323
        group = self.group or "core"
5✔
324
        version = self.version
5✔
325
        kind = self.kind
5✔
326

327
        if self.custom:
5✔
328
            k8s_api = k8s_client_module.CustomObjectsApi(k8s_client)
5✔
329

330
            kwargs = {"group": group,
5✔
331
                      "version": version,
332
                      "plural": self.plural}
333
            if self.namespaced:
5!
334
                self._api_get = partial(k8s_api.get_namespaced_custom_object, **kwargs)
5✔
335
                self._api_patch = partial(k8s_api.patch_namespaced_custom_object, **kwargs)
5✔
336
                self._api_create = partial(k8s_api.create_namespaced_custom_object, **kwargs)
5✔
337
                self._api_delete = partial(k8s_api.delete_namespaced_custom_object, **kwargs)
5✔
338
                self._api_list = partial(k8s_api.list_namespaced_custom_object, **kwargs)
5✔
339
            else:
340
                self._api_get = partial(k8s_api.get_cluster_custom_object, **kwargs)
×
341
                self._api_patch = partial(k8s_api.patch_cluster_custom_object, **kwargs)
×
342
                self._api_create = partial(k8s_api.create_cluster_custom_object, **kwargs)
×
343
                self._api_delete = partial(k8s_api.delete_cluster_custom_object, **kwargs)
×
NEW
344
                self._api_list = partial(k8s_api.list_cluster_custom_object, **kwargs)
×
345
        else:
346
            # Take care for the case e.g. api_type is "apiextensions.k8s.io"
347
            # Only replace the last instance
348
            group = "".join(group.rsplit(".k8s.io", 1))
5✔
349

350
            # convert group name from DNS subdomain format to
351
            # python class name convention
352
            group = "".join(word.capitalize() for word in group.split('.'))
5✔
353
            fcn_to_call = f"{group}{version.capitalize()}Api"
5✔
354
            k8s_api = getattr(k8s_client_module, fcn_to_call)(k8s_client)
5✔
355

356
            # Replace CamelCased action_type into snake_case
357
            kind = UPPER_FOLLOWED_BY_LOWER_RE.sub(r"\1_\2", kind)
5✔
358
            kind = LOWER_OR_NUM_FOLLOWED_BY_UPPER_RE.sub(r"\1_\2", kind).lower()
5✔
359

360
            if self.namespaced:
5✔
361
                self._api_get = getattr(k8s_api, f"read_namespaced_{kind}")
5✔
362
                self._api_patch = getattr(k8s_api, f"patch_namespaced_{kind}")
5✔
363
                self._api_create = getattr(k8s_api, f"create_namespaced_{kind}")
5✔
364
                self._api_delete = getattr(k8s_api, f"delete_namespaced_{kind}")
5✔
365
                self._api_list = getattr(k8s_api, f"list_namespaced_{kind}")
5✔
366
            else:
367
                self._api_get = getattr(k8s_api, f"read_{kind}")
5✔
368
                self._api_patch = getattr(k8s_api, f"patch_{kind}")
5✔
369
                self._api_create = getattr(k8s_api, f"create_{kind}")
5✔
370
                self._api_delete = getattr(k8s_api, f"delete_{kind}")
5✔
371
                self._api_list = getattr(k8s_api, f"list_{kind}")
5✔
372

373

374
class K8SResourceKey(namedtuple("K8SResourceKey", ["group", "kind", "name", "namespace"])):
5✔
375
    __slots__ = ()
5✔
376

377
    def __str__(self):
5✔
378
        return (f"{self.group}{'/' if self.group else 'v1/'}{self.kind}"
×
379
                f"/{self.name}{f'.{self.namespace}' if self.namespace else ''}")
380

381

382
class K8SResource:
5✔
383
    _k8s_client_version = None
5✔
384
    _k8s_field_validation = None
5✔
385
    _k8s_field_validation_patched = None
5✔
386
    _logger = None
5✔
387
    _api_warnings = None
5✔
388

389
    def __init__(self, manifest: dict, rdef: K8SResourceDef, source: Union[str, Path] = None):
5✔
390
        self.key = self.get_manifest_key(manifest)
5✔
391

392
        self.manifest = manifest
5✔
393
        self.rdef = rdef
5✔
394
        self.source = source
5✔
395

396
    @property
5✔
397
    def group(self) -> str:
5✔
398
        return self.key.group
5✔
399

400
    @property
5✔
401
    def version(self) -> str:
5✔
402
        return self.rdef.version
5✔
403

404
    @property
5✔
405
    def kind(self) -> str:
5✔
406
        return self.key.kind
5✔
407

408
    @property
5✔
409
    def name(self) -> str:
5✔
410
        return self.key.name
5✔
411

412
    @name.setter
5✔
413
    def name(self, value):
5✔
414
        self.manifest["metadata"]["name"] = value
×
415
        self.key = self.get_manifest_key(self.manifest)
×
416

417
    @property
5✔
418
    def namespace(self) -> Optional[str]:
5✔
419
        return self.key.namespace
5✔
420

421
    @namespace.setter
5✔
422
    def namespace(self, value):
5✔
423
        self.manifest["metadata"]["namespace"] = value
×
424
        self.key = self.get_manifest_key(self.manifest)
×
425

426
    @property
5✔
427
    def api_version(self) -> str:
5✔
428
        return self.manifest["apiVersion"]
5✔
429

430
    @property
5✔
431
    def schema(self) -> dict:
5✔
432
        return self.rdef.schema
×
433

434
    @property
5✔
435
    def is_crd(self):
5✔
436
        return self.group == "apiextensions.k8s.io" and self.kind == "CustomResourceDefinition"
5✔
437

438
    def __str__(self):
5✔
439
        return f"{self.api_version}/{self.kind}/{self.name}{'.' + self.namespace if self.namespace else ''}"
5✔
440

441
    @_normalize_api_exc
5✔
442
    def get(self):
5✔
443
        rdef = self.rdef
5✔
444
        kwargs = {"name": self.name,
5✔
445
                  "_preload_content": False}
446
        if rdef.namespaced:
5✔
447
            kwargs["namespace"] = self.namespace
5✔
448
        return json.loads(self.rdef.get(**kwargs).data)
5✔
449

450
    @_normalize_api_exc
5✔
451
    def create(self, dry_run=True):
5✔
452
        rdef = self.rdef
5✔
453
        kwargs = {"body": self.manifest,
5✔
454
                  "_preload_content": False,
455
                  "field_manager": "kubernator",
456
                  }
457

458
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
459
        if self._k8s_field_validation_patched or not self.rdef.custom:
5!
460
            kwargs["field_validation"] = self._k8s_field_validation
5✔
461
        if rdef.namespaced:
5✔
462
            kwargs["namespace"] = self.namespace
5✔
463
        if dry_run:
5✔
464
            kwargs["dry_run"] = "All"
5✔
465
        resp = rdef.create(**kwargs)
5✔
466
        self._process_response_headers(resp)
5✔
467
        return json.loads(resp.data)
5✔
468

469
    @_normalize_api_exc
5✔
470
    def patch(self, json_patch, *, patch_type: K8SResourcePatchType, force=False, dry_run=True):
5✔
471
        rdef = self.rdef
5✔
472
        kwargs = {"name": self.name,
5✔
473
                  "body": json_patch,
474
                  "_preload_content": False,
475
                  "field_manager": "kubernator",
476
                  }
477

478
        # `and not self.rdef.custom` to be removed after solving https://github.com/kubernetes-client/gen/issues/259
479
        if self._k8s_field_validation_patched or not self.rdef.custom:
5!
480
            kwargs["field_validation"] = self._k8s_field_validation
5✔
481
        if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
5✔
482
            kwargs["force"] = force
5✔
483
        if rdef.namespaced:
5✔
484
            kwargs["namespace"] = self.namespace
5✔
485
        if dry_run:
5✔
486
            kwargs["dry_run"] = "All"
5✔
487

488
        def select_header_content_type_patch(content_types):
5✔
489
            if patch_type == K8SResourcePatchType.JSON_PATCH:
5✔
490
                return "application/json-patch+json"
5✔
491
            if patch_type == K8SResourcePatchType.SERVER_SIDE_PATCH:
5!
492
                return "application/apply-patch+yaml"
5✔
493
            raise NotImplementedError
×
494

495
        if isinstance(rdef.patch, partial):
5!
496
            api_client = rdef.patch.func.__self__.api_client
×
497
        else:
498
            api_client = rdef.patch.__self__.api_client
5✔
499

500
        old_func = api_client.select_header_content_type
5✔
501
        try:
5✔
502
            api_client.select_header_content_type = select_header_content_type_patch
5✔
503
            resp = rdef.patch(**kwargs)
5✔
504
            self._process_response_headers(resp)
5✔
505
            return json.loads(resp.data)
5✔
506
        finally:
507
            api_client.select_header_content_type = old_func
5✔
508

509
    @_normalize_api_exc
5✔
510
    def delete(self, *, dry_run=True, propagation_policy=K8SPropagationPolicy.BACKGROUND, wait=True):
5✔
511
        from kubernetes.client import ApiException
5✔
512
        rdef = self.rdef
5✔
513
        kwargs = {"name": self.name,
5✔
514
                  "_preload_content": False,
515
                  "propagation_policy": propagation_policy.policy
516
                  }
517
        if rdef.namespaced:
5!
518
            kwargs["namespace"] = self.namespace
5✔
519
        if dry_run:
5✔
520
            kwargs["dry_run"] = "All"
5✔
521

522
        result = json.loads(rdef.delete(**kwargs).data)
5✔
523

524
        if wait and not dry_run:
5!
525
            # Wait for the resource to actually disappear by watching for the
526
            # DELETED event. If the resource is already gone before the watch
527
            # opens (race), the watch produces no events; loop with a short
528
            # server-side timeout and re-check existence via get() to break out.
529
            while True:
5✔
530
                for event in self.watch(timeout_seconds=10):
5✔
NEW
531
                    if event["type"] == "DELETED":
4✔
NEW
532
                        return result
4✔
533
                try:
1✔
534
                    self.get()
1✔
535
                except ApiException as e:
1✔
536
                    if e.status == 404:
1!
537
                        return result
1✔
NEW
538
                    raise
×
539

NEW
540
        return result
×
541

542
    def watch(self, *, timeout_seconds=None):
5✔
543
        from kubernetes import watch as k8s_watch
5✔
544
        rdef = self.rdef
5✔
545
        kwargs = {"field_selector": f"metadata.name={self.name}"}
5✔
546
        if rdef.namespaced:
5!
547
            kwargs["namespace"] = self.namespace
5✔
548
        if timeout_seconds is not None:
5!
549
            kwargs["timeout_seconds"] = timeout_seconds
5✔
550
        return k8s_watch.Watch().stream(rdef.list, **kwargs)
5✔
551

552
    @staticmethod
5✔
553
    def get_manifest_key(manifest):
5✔
554
        return K8SResourceKey(to_group_and_version(manifest["apiVersion"])[0],
5✔
555
                              manifest["kind"],
556
                              manifest["metadata"]["name"],
557
                              manifest["metadata"].get("namespace"))
558

559
    @staticmethod
5✔
560
    def get_manifest_description(manifest: dict, source=None):
5✔
561
        api_version = manifest.get("apiVersion")
5✔
562
        kind = manifest.get("kind")
5✔
563
        metadata = manifest.get("metadata")
5✔
564
        name = None
5✔
565
        namespace = None
5✔
566
        if metadata:
5!
567
            name = metadata.get("name")
5✔
568
            namespace = metadata.get("namespace")
5✔
569
        return (f"{api_version or 'unknown'}/{kind or '<unknown>'}/"
5✔
570
                f"{name or '<unknown>'}{'.' + namespace if namespace else ''}")
571

572
    def __eq__(self, other):
5✔
573
        if not isinstance(other, K8SResource):
×
574
            return False
×
575
        return self.key == other.key and self.manifest == other.manifest
×
576

577
    def _process_response_headers(self, resp):
5✔
578
        headers = resp.headers
5✔
579
        warn_headers = headers.get("Warning")
5✔
580
        if warn_headers:
5✔
581
            for warn in K8S_WARNING_HEADER.findall(warn_headers):
5✔
582
                code, _, msg, _ = warn
5✔
583
                code = int(code)
5✔
584
                msg = msg.encode("utf-8").decode("unicode_escape")
5✔
585
                if code == 299:
5!
586
                    self._api_warnings(self, msg)
5✔
587
                else:
588
                    self._logger.warning("Unknown API warning received for resource %s from %s: code %d: %s",
×
589
                                         self, self.source, code, msg)
590

591

592
class K8SResourcePluginMixin:
5✔
593
    def __init__(self):
5✔
594
        self.resource_definitions: MutableMapping[K8SResourceDefKey, K8SResourceDef] = {}
5✔
595
        self.resource_paths: MutableMapping[K8SResourceDefKey, MutableMapping[str, dict]] = {}
5✔
596
        self.resources: MutableMapping[K8SResourceKey, K8SResource] = {}
5✔
597

598
        self.resource_definitions_schema = None
5✔
599

600
    def add_resources(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
5✔
601
        if not source:
5✔
602
            source = calling_frame_source()
5✔
603

604
        if isinstance(manifests, str):
5✔
605
            manifests = list(parse_yaml_docs(manifests, source))
5✔
606

607
        if isinstance(manifests, (Mapping, dict)):
5!
608
            return self.add_resource(manifests, source)
×
609
        else:
610
            return [self.add_resource(m, source) for m in manifests if m]
5✔
611

612
    def add_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
613
        if not source:
5!
614
            source = calling_frame_source()
×
615
        resource = self._create_resource(manifest, source)
5✔
616

617
        try:
5✔
618
            trans_resource = self._transform_resource(list(self.resources.values()), resource)
5✔
619
        except Exception as e:
×
620
            self.logger.error("An error occurred running transformers on %s", resource, exc_info=e)
×
621
            raise
×
622

623
        errors = list(self._validate_resource(trans_resource.manifest, source))
5✔
624
        if errors:
5!
625
            for error in errors:
×
626
                if source:
×
627
                    self.logger.error("Error detected in re-transformed K8S resource %s generated through %s",
×
628
                                      trans_resource, source, exc_info=error)
629
            raise errors[0]
×
630

631
        return self._add_resource(trans_resource, source)
5✔
632

633
    def add_crds(self, manifests: Union[str, list, dict], source: Union[str, Path] = None):
5✔
634
        if not source:
×
635
            source = calling_frame_source()
×
636

637
        if isinstance(manifests, str):
×
638
            manifests = list(parse_yaml_docs(manifests, source))
×
639

640
        if isinstance(manifests, (Mapping, dict)):
×
641
            return self.add_crd(manifests, source)
×
642
        else:
643
            return [self.add_crd(m, source) for m in manifests if m]
×
644

645
    def add_crd(self, manifest: dict, source: Union[str, Path] = None):
5✔
646
        if not source:
5!
647
            source = calling_frame_source()
×
648
        resource = self._create_resource(manifest, source)
5✔
649
        if not resource.is_crd:
5!
650
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
651
            raise ValueError(f"K8S manifest {resource_description} from {source} is not a CRD")
×
652

653
        self._add_crd(resource)
5✔
654
        return resource
5✔
655

656
    def create_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
657
        """Create K8S resource without adding it"""
658
        if not source:
×
659
            source = calling_frame_source()
×
660

661
        return self._create_resource(manifest, source)
×
662

663
    def add_local_resources(self, path: Path, file_type: FileType, source: str = None):
5✔
664
        manifests = load_file(self.logger, path, file_type)
×
665

666
        return [self.add_resource(m, source or path) for m in manifests if m]
×
667

668
    def add_remote_resources(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
5✔
669
                             source: str = None):
670
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
×
671

672
        return [self.add_resource(m, source or url) for m in manifests if m]
×
673

674
    def add_local_crds(self, path: Path, file_type: FileType, source: str = None):
5✔
675
        manifests = load_file(self.logger, path, file_type)
5✔
676

677
        return [self.add_crd(m, source or path) for m in manifests if m]
5✔
678

679
    def add_remote_crds(self, url: str, file_type: FileType, *, sub_category: Optional[str] = None,
5✔
680
                        source: str = None):
681
        manifests = load_remote_file(self.logger, url, file_type, sub_category=sub_category)
5✔
682

683
        return [self.add_crd(m, source or url) for m in manifests if m]
5✔
684

685
    def get_api_versions(self):
5✔
686
        api_versions = set()
5✔
687
        for rdef in self.resource_definitions:
5✔
688
            api_version = f"{f'{rdef.group}/' if rdef.group else ''}{rdef.version}"
5✔
689
            if api_version not in api_versions:
5✔
690
                api_versions.add(api_version)
5✔
691
        return sorted(api_versions)
5✔
692

693
    def _create_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
694
        resource_description = K8SResource.get_manifest_description(manifest, source)
5✔
695

696
        new_manifest = self._patch_manifest(manifest, resource_description)
5✔
697
        if new_manifest != manifest:
5!
698
            manifest = new_manifest
×
699
            resource_description = K8SResource.get_manifest_description(manifest, source)
×
700

701
        self.logger.debug("Validating K8S manifest for %s", resource_description)
5✔
702
        errors = list(self._validate_resource(manifest, source))
5✔
703
        if errors:
5!
704
            for error in errors:
×
705
                self.logger.error("Error detected in K8S manifest %s from %s: \n%s",
×
706
                                  resource_description, source or "<unknown>", yaml.safe_dump(manifest, None),
707
                                  exc_info=error)
708
            raise errors[0]
×
709

710
        rdef = self._get_manifest_rdef(manifest)
5✔
711
        return K8SResource(manifest, rdef, source)
5✔
712

713
    def _add_resource(self, resource: K8SResource, source):
5✔
714
        if resource.key in self.resources:
5!
715
            existing_resource = self.resources[resource.key]
×
716
            if resource != existing_resource:
×
717
                raise ValidationError("resource %s from %s already exists and was added from %s" %
×
718
                                      (resource.key, resource.source, existing_resource.source))
719
            self.logger.trace("K8S resource for %s from %s is already present and is identical", resource, source)
×
720
            return existing_resource
×
721

722
        self.logger.info("Adding K8S resource for %s from %s", resource, source)
5✔
723
        self.resources[resource.key] = resource
5✔
724

725
        if resource.is_crd:
5✔
726
            self._add_crd(resource)
5✔
727

728
        return resource
5✔
729

730
    def _patch_manifest(self,
5✔
731
                        manifest: dict,
732
                        resource_description: str):
733
        return manifest
5✔
734

735
    def _transform_resource(self,
5✔
736
                            resources: Sequence[K8SResource],
737
                            resource: K8SResource) -> K8SResource:
738
        return resource
5✔
739

740
    def _filter_resources(self, func: Callable[[K8SResource], bool]):
5✔
741
        yield from filter(func, self.resources.values())
×
742

743
    def _validate_resource(self, manifest: dict, source: Union[str, Path] = None):
5✔
744
        for error in self._yield_manifest_rdef(manifest):
5✔
745
            if isinstance(error, Exception):
5!
746
                yield error
×
747
            else:
748
                rdef = error
5✔
749
                k8s_validator = K8SValidator(rdef.schema,
5✔
750
                                             format_checker=k8s_format_checker)
751
                yield from k8s_validator.iter_errors(manifest)
5✔
752

753
    def _get_manifest_rdef(self, manifest):
5✔
754
        for error in self._yield_manifest_rdef(manifest):
5!
755
            if isinstance(error, Exception):
5!
756
                raise error
×
757
            else:
758
                return error
5✔
759

760
    def _yield_manifest_rdef(self, manifest):
5✔
761
        error = None
5✔
762
        for error in K8S_MINIMAL_RESOURCE_VALIDATOR.iter_errors(manifest):
5!
763
            yield error
×
764

765
        if error:
5!
766
            return
×
767

768
        key = K8SResourceDefKey(*to_group_and_version(manifest["apiVersion"]), manifest["kind"])
5✔
769

770
        try:
5✔
771
            yield self.resource_definitions[key]
5✔
772
        except KeyError:
5✔
773
            yield ValidationError("%s is not a defined Kubernetes resource" % (key,),
×
774
                                  validator=K8S_MINIMAL_RESOURCE_VALIDATOR,
775
                                  validator_value=key,
776
                                  instance=manifest,
777
                                  schema=K8S_MINIMAL_RESOURCE_SCHEMA)
778

779
    def _add_crd(self, resource: K8SResource):
5✔
780
        for crd in K8SResourceDef.from_resource(resource):
5✔
781
            self.logger.info("Adding K8S CRD definition %s", crd.key)
5✔
782
            self.resource_definitions[crd.key] = crd
5✔
783

784
    def _populate_resource_definitions(self):
5✔
785
        k8s_def = self.resource_definitions_schema
5✔
786

787
        def k8s_resource_def_key(v: Mapping[str, Union[list, Mapping]]) -> Iterable[K8SResourceDefKey]:
5✔
788
            gvks = v.get("x-kubernetes-group-version-kind")
5✔
789
            if gvks:
5✔
790
                if isinstance(gvks, Mapping):
5✔
791
                    gvk = gvks
5✔
792
                    yield K8SResourceDefKey(gvk["group"],
5✔
793
                                            gvk["version"],
794
                                            gvk["kind"])
795
                else:
796
                    for gvk in gvks:
5✔
797
                        yield K8SResourceDefKey(gvk["group"],
5✔
798
                                                gvk["version"],
799
                                                gvk["kind"])
800

801
        paths = k8s_def["paths"]
5✔
802
        for path, actions in paths.items():
5✔
803
            path_rdk = None
5✔
804
            path_actions = []
5✔
805
            for action, action_details in actions.items():
5✔
806
                if action == "parameters":
5✔
807
                    continue
5✔
808
                rdks = list(k8s_resource_def_key(action_details))
5✔
809
                if rdks:
5✔
810
                    assert len(rdks) == 1
5✔
811
                    rdk = rdks[0]
5✔
812
                    if path_rdk:
5✔
813
                        if path_rdk != rdk:
5!
814
                            raise ValueError(f"Encountered path action x-kubernetes-group-version-kind conflict: "
×
815
                                             f"{path}: {actions}")
816
                        path_actions.append(action_details["x-kubernetes-action"])
5✔
817
                    else:
818
                        path_rdk = rdk
5✔
819

820
            if path_rdk:
5✔
821
                rdef_paths = self.resource_paths.get(path_rdk)
5✔
822
                if not rdef_paths:
5✔
823
                    rdef_paths = {}
5✔
824
                    self.resource_paths[path_rdk] = rdef_paths
5✔
825
                rdef_paths[path] = actions
5✔
826

827
        for k, schema in k8s_def["definitions"].items():
5✔
828
            # This short-circuits the resolution of the references to the top of the document
829
            schema["definitions"] = k8s_def["definitions"]
5✔
830
            for key in k8s_resource_def_key(schema):
5✔
831
                for rdef in K8SResourceDef.from_manifest(key, schema, self.resource_paths):
5✔
832
                    self.resource_definitions[key] = rdef
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc