• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24621395421

19 Apr 2026 04:58AM UTC coverage: 81.992% (+4.1%) from 77.939%
24621395421

push

github

web-flow
Add OpenAPI v3 validation with CEL rules; extract validator into k8s_schema (#104)

## Summary

Extracts all OpenAPI validation out of `k8s.py` / `k8s_api.py` into a
new `kubernator/plugins/k8s_schema/` package, and adds an OpenAPI v3
validator with client-side CEL rule evaluation as the default on
clusters ≥ 1.27.

- **`k8s_api.py` no longer does validation** — `K8SResourcePluginMixin`
holds a single `self.validator` and delegates every manifest check
(minimal envelope → rdef lookup → full schema → CEL rules) to it.
Dependency graph is one-way: `k8s_schema → k8s_api`.
- **Two validators behind a factory.** `SwaggerV2Validator` preserves
current behaviour (swagger.json + OAS31). `OpenAPIV3Validator` is new:
cluster-first `/openapi/v3` discovery with GitHub fallback, lazy
per-group fetch, transitive `$ref` closure across group documents.
- **K8s extensions enforced client-side** under v3:
`x-kubernetes-list-type` (`atomic` / `set` / `map` with
`list-map-keys`), `-preserve-unknown-fields`, `-embedded-resource`,
`-int-or-string`.
- **CEL rules evaluated client-side.** Lazy-compiled, cached by rule
text for the run. Ports of the K8s CEL libraries — `lists`, `regex`,
`format.named(...)`, `quantity`, `IP`, `CIDR` — plus a compatibility
shim for `optional.of` / `optional.none` / `hasValue` / `value` /
`orValue` so `optionalSelf` / `optionalOldSelf` work. Transition rules
fire in the apply path against the cluster's current state.
- **Selection.** `ktor.k8s.register()` gains `openapi_version` (`auto` /
`v2` / `v3`) and `openapi_source` (`auto` / `cluster` / `github`) kwargs
(also readable from context). `auto` gates on server minor ≥ 1.27 and
falls back to v2 on any v3 failure. `istio.py` is migrated to the same
factory.

New dep: `cel-python ~=0.5`.

## Test plan

- [x] 111 unit tests (v2 parity; v3 lazy fetch; api_versions without
sub-doc fetch; source fallback; every extension keyword; CEL walker +
evaluator; 7 extension libraries; factory selection / vers... (continued)

943 of 1328 branches covered (71.01%)

Branch coverage included in aggregate %.

1040 of 1082 new or added lines in 18 files covered. (96.12%)

6 existing lines in 2 files now uncovered.

3979 of 4675 relevant lines covered (85.11%)

4.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.77
/src/main/python/kubernator/plugins/k8s.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19

20
import json
5✔
21
import logging
5✔
22
import re
5✔
23
import sys
5✔
24
import types
5✔
25
from collections.abc import Mapping
5✔
26
from functools import partial
5✔
27
from importlib.metadata import version as pkg_version
5✔
28
from pathlib import Path
5✔
29
from typing import Iterable, Callable, Sequence, Optional
5✔
30

31
import jsonpatch
5✔
32
import yaml
5✔
33

34
from kubernator.api import (KubernatorPlugin,
5✔
35
                            Globs,
36
                            scan_dir,
37
                            load_file,
38
                            FileType,
39
                            StripNL,
40
                            install_python_k8s_client,
41
                            TemplateEngine,
42
                            calling_frame_source,
43
                            parse_yaml_docs)
44
from kubernator.merge import extract_merge_instructions, apply_merge_instructions
5✔
45
from kubernator.plugins import k8s_schema
5✔
46
from kubernator.plugins.k8s_api import (K8SResourcePluginMixin,
5✔
47
                                        K8SResource,
48
                                        K8SResourcePatchType,
49
                                        K8SPropagationPolicy,
50
                                        api_exc_format_body)
51

52
logger = logging.getLogger("kubernator.k8s")
5✔
53
proc_logger = logger.getChild("proc")
5✔
54
stdout_logger = StripNL(proc_logger.info)
5✔
55
stderr_logger = StripNL(proc_logger.warning)
5✔
56

57
FIELD_VALIDATION_STRICT_MARKER = "strict decoding error: "
5✔
58
VALID_FIELD_VALIDATION = ("Ignore", "Warn", "Strict")
5✔
59

60

61
def final_resource_validator(resources: Sequence[K8SResource],
5✔
62
                             resource: K8SResource,
63
                             error: Callable[..., Exception]) -> Iterable[Exception]:
64
    final_key = resource.get_manifest_key(resource.manifest)
5✔
65
    if final_key != resource.key:
5!
66
        yield error("Illegal change of identifiers of the resource "
×
67
                    "%s from %s have been changed to %s",
68
                    resource.key, resource.source, final_key)
69

70
    if resource.rdef.namespaced and not resource.namespace:
5!
71
        yield error("Namespaced resource %s from %s is missing the required namespace",
×
72
                    resource, resource.source)
73

74

75
def normalize_pkg_version(v: str):
5✔
76
    v_split = v.split(".")
5✔
77
    rev = v_split[-1]
5✔
78
    if not rev.isdigit():
5✔
79
        new_rev = ""
5✔
80
        for c in rev:
5!
81
            if not c.isdigit():
5✔
82
                break
5✔
83
            new_rev += c
5✔
84
        v_split[-1] = new_rev
5✔
85
    return tuple(map(int, v_split))
5✔
86

87

88
class KubernetesPlugin(KubernatorPlugin, K8SResourcePluginMixin):
5✔
89
    logger = logger
5✔
90

91
    _name = "k8s"
5✔
92

93
    def __init__(self):
5✔
94
        super().__init__()
5✔
95
        self.context = None
5✔
96

97
        self.embedded_pkg_version = self._get_kubernetes_client_version()
5✔
98

99
        self._transformers = []
5✔
100
        self._validators = []
5✔
101
        self._manifest_patchers = []
5✔
102
        self._summary = 0, 0, 0
5✔
103
        self._template_engine = TemplateEngine(logger)
5✔
104

105
    def set_context(self, context):
5✔
106
        self.context = context
5✔
107

108
    def register(self,
5✔
109
                 field_validation="Warn",
110
                 field_validation_warn_fatal=True,
111
                 disable_client_patches=False,
112
                 openapi_version="auto",
113
                 openapi_source="auto"):
114
        self.context.app.register_plugin("kubeconfig")
5✔
115

116
        if field_validation not in VALID_FIELD_VALIDATION:
5!
117
            raise ValueError("'field_validation' must be one of %s" % (", ".join(VALID_FIELD_VALIDATION)))
×
118

119
        if openapi_version not in ("auto", "v2", "v3"):
5!
NEW
120
            raise ValueError("'openapi_version' must be auto|v2|v3")
×
121
        if openapi_source not in ("auto", "cluster", "github"):
5!
NEW
122
            raise ValueError("'openapi_source' must be auto|cluster|github")
×
123

124
        context = self.context
5✔
125
        context.globals.k8s = dict(patch_field_excludes=("^/metadata/managedFields",
5✔
126
                                                         "^/metadata/generation",
127
                                                         "^/metadata/creationTimestamp",
128
                                                         "^/metadata/resourceVersion",
129
                                                         ),
130
                                   openapi_version=openapi_version,
131
                                   openapi_source=openapi_source,
132
                                   immutable_changes={("apps", "DaemonSet"): K8SPropagationPolicy.BACKGROUND,
133
                                                      ("apps", "StatefulSet"): K8SPropagationPolicy.ORPHAN,
134
                                                      ("apps", "Deployment"): K8SPropagationPolicy.ORPHAN,
135
                                                      ("storage.k8s.io", "StorageClass"): K8SPropagationPolicy.ORPHAN,
136
                                                      (None, "Pod"): K8SPropagationPolicy.BACKGROUND,
137
                                                      ("batch", "Job"): K8SPropagationPolicy.ORPHAN,
138
                                                      },
139
                                   default_includes=Globs(["*.yaml", "*.yml"], True),
140
                                   default_excludes=Globs([".*"], True),
141
                                   add_resources=self.add_resources,
142
                                   load_resources=self.api_load_resources,
143
                                   load_remote_resources=self.api_load_remote_resources,
144
                                   load_crds=self.api_load_crds,
145
                                   import_cluster_crds=self.api_import_cluster_crds,
146
                                   load_remote_crds=self.api_load_remote_crds,
147
                                   add_transformer=self.api_add_transformer,
148
                                   remove_transformer=self.api_remove_transformer,
149
                                   add_validator=self.api_remove_validator,
150
                                   add_manifest_patcher=self.api_add_manifest_patcher,
151
                                   get_api_versions=self.get_api_versions,
152
                                   create_resource=self.create_resource,
153
                                   disable_client_patches=disable_client_patches,
154
                                   field_validation=field_validation,
155
                                   field_validation_warn_fatal=field_validation_warn_fatal,
156
                                   field_validation_warnings=0,
157
                                   resource_generator=self.resource_generator,
158
                                   resource=self.resource,
159
                                   conflict_retry_delay=0.3,
160
                                   _k8s=self,
161
                                   )
162
        context.k8s = dict(default_includes=Globs(context.globals.k8s.default_includes),
5✔
163
                           default_excludes=Globs(context.globals.k8s.default_excludes)
164
                           )
165
        self.api_add_validator(final_resource_validator)
5✔
166

167
    def handle_init(self):
5✔
168
        pass
5✔
169

170
    def handle_start(self):
5✔
171
        self.context.kubeconfig.register_change_notifier(self._kubeconfig_changed)
5✔
172
        self.setup_client()
5✔
173

174
    def _kubeconfig_changed(self):
5✔
175
        self.setup_client()
×
176

177
    def _get_kubernetes_client_version(self):
5✔
178
        return pkg_version("kubernetes").split(".")
5✔
179

180
    def setup_client(self):
5✔
181
        k8s = self.context.k8s
5✔
182
        if "server_version" not in k8s:
5!
183
            self._setup_client()
5✔
184

185
        server_minor = k8s.server_version[1]
5✔
186

187
        logger.info("Using Kubernetes client version =~%s.0 for server version %s",
5✔
188
                    server_minor, ".".join(k8s.server_version))
189
        pkg_dir = install_python_k8s_client(self.context.app.run_passthrough_capturing, server_minor, logger,
5✔
190
                                            stdout_logger, stderr_logger, k8s.disable_client_patches)
191

192
        modules_to_delete = []
5✔
193
        for k, v in sys.modules.items():
5✔
194
            if k == "kubernetes" or k.startswith("kubernetes."):
5✔
195
                modules_to_delete.append(k)
5✔
196
        for k in modules_to_delete:
5✔
197
            del sys.modules[k]
5✔
198

199
        logger.info("Adding sys.path reference to %s", pkg_dir)
5✔
200
        sys.path.insert(0, str(pkg_dir))
5✔
201
        self.embedded_pkg_version = self._get_kubernetes_client_version()
5✔
202
        logger.info("Switching to Kubernetes client version %s", ".".join(self.embedded_pkg_version))
5✔
203
        self._setup_client()
5✔
204

205
        self.validator = k8s_schema.make_validator(self.context)
5✔
206

207
    def _setup_client(self):
5✔
208
        from kubernetes import client
5✔
209

210
        context = self.context
5✔
211
        k8s = context.k8s
5✔
212

213
        k8s.client = self._setup_k8s_client()
5✔
214
        version = client.VersionApi(k8s.client).get_code()
5✔
215
        # Strip vendor-specific suffixes so OpenAPI lookups hit upstream tags.
216
        # EKS/GKE use a dash (e.g. v1.28.3-eks-..., v1.28.3-gke.100);
217
        # k3s uses a plus sign (e.g. v1.35.3+k3s1).
218
        git_version = version.git_version.split("-")[0].split("+")[0]
5✔
219

220
        k8s.server_version = git_version[1:].split(".")
5✔
221
        k8s.server_git_version = git_version
5✔
222

223
        logger.info("Found Kubernetes %s on %s", k8s.server_git_version, k8s.client.configuration.host)
5✔
224

225
        K8SResource._k8s_client_version = normalize_pkg_version(pkg_version("kubernetes"))
5✔
226
        K8SResource._k8s_field_validation = k8s.field_validation
5✔
227
        K8SResource._k8s_field_validation_patched = not k8s.disable_client_patches
5✔
228
        K8SResource._logger = self.logger
5✔
229
        K8SResource._api_warnings = self._api_warnings
5✔
230

231
    def _api_warnings(self, resource, warn):
5✔
232
        k8s = self.context.k8s
5✔
233
        self.context.globals.k8s.field_validation_warnings += 1
5✔
234

235
        log = self.logger.warning
5✔
236
        if k8s.field_validation_warn_fatal:
5✔
237
            log = self.logger.error
5✔
238

239
        log("FAILED FIELD VALIDATION on resource %s from %s: %s", resource, resource.source, warn)
5✔
240

241
    def handle_before_dir(self, cwd: Path):
5✔
242
        context = self.context
5✔
243
        context.k8s.default_includes = Globs(context.k8s.default_includes)
5✔
244
        context.k8s.default_excludes = Globs(context.k8s.default_excludes)
5✔
245
        context.k8s.includes = Globs(context.k8s.default_includes)
5✔
246
        context.k8s.excludes = Globs(context.k8s.default_excludes)
5✔
247

248
    def handle_after_dir(self, cwd: Path):
5✔
249
        context = self.context
5✔
250
        k8s = context.k8s
5✔
251

252
        for f in scan_dir(logger, cwd, lambda d: d.is_file(), k8s.excludes, k8s.includes):
5✔
253
            p = cwd / f.name
5✔
254
            display_p = context.app.display_path(p)
5✔
255
            logger.debug("Adding Kubernetes manifest from %s", display_p)
5✔
256

257
            manifests = load_file(logger, p, FileType.YAML, display_p,
5✔
258
                                  self._template_engine,
259
                                  {"ktor": context}
260
                                  )
261

262
            for manifest in manifests:
5✔
263
                if manifest:
5!
264
                    self.add_resource(manifest, display_p)
5✔
265

266
    def resource_generator(self):
5✔
267
        yield from self.resources.values()
5✔
268

269
    def resource(self, manifest, source=None):
5✔
270
        from kubernetes import client
5✔
271
        if not source:
5!
272
            source = calling_frame_source()
5✔
273
        if isinstance(manifest, str):
5!
274
            docs = [m for m in parse_yaml_docs(manifest, source) if m]
5✔
275
            if len(docs) != 1:
5!
276
                raise ValueError(f"ktor.k8s.resource() expects a single manifest document, got {len(docs)} from {source}")
×
277
            manifest = docs[0]
5✔
278
        res = self._create_resource(manifest, source)
5✔
279
        res.rdef.populate_api(client, self.context.k8s.client)
5✔
280
        return res
5✔
281

282
    def handle_apply(self):
5✔
283
        context = self.context
5✔
284
        k8s = context.k8s
5✔
285

286
        self._validate_resources()
5✔
287

288
        cmd = context.app.args.command
5✔
289
        file_name = context.app.args.file
5✔
290
        file_format = context.app.args.output_format
5✔
291
        dry_run = context.app.args.dry_run
5✔
292
        dump = cmd == "dump"
5✔
293

294
        status_msg = f"{' (dump only)' if dump else ' (dry run)' if dry_run else ''}"
5✔
295
        if dump:
5✔
296
            logger.info("Will dump the changes into a file %s in %s format", file_name or "<stdout>", file_format)
5✔
297

298
        patch_field_excludes = [re.compile(e) for e in context.globals.k8s.patch_field_excludes]
5✔
299
        dump_results = []
5✔
300
        total_created, total_patched, total_deleted = 0, 0, 0
5✔
301
        for resource in k8s.resource_generator():
5✔
302
            if dump:
5✔
303
                resource_id = {"apiVersion": resource.api_version,
5✔
304
                               "kind": resource.kind,
305
                               "name": resource.name
306
                               }
307

308
                def patch_func(patch):
5✔
309
                    if resource.rdef.namespaced:
5✔
310
                        resource_id["namespace"] = resource.namespace
5✔
311
                    method_descriptor = {"method": "patch",
5✔
312
                                         "resource": resource_id,
313
                                         "body": patch
314
                                         }
315
                    dump_results.append(method_descriptor)
5✔
316
                    return resource.manifest
5✔
317

318
                def create_func():
5✔
319
                    method_descriptor = {"method": "create",
5✔
320
                                         "body": resource.manifest}
321
                    dump_results.append(method_descriptor)
5✔
322
                    return resource.manifest
5✔
323

324
                def delete_func(*, propagation_policy):
5✔
325
                    method_descriptor = {"method": "delete",
×
326
                                         "resource": resource_id,
327
                                         "propagation_policy": propagation_policy.policy
328
                                         }
329
                    dump_results.append(method_descriptor)
×
330
                    return None
×
331
            else:
332
                patch_func = partial(resource.patch, patch_type=K8SResourcePatchType.JSON_PATCH, dry_run=dry_run)
5✔
333
                create_func = partial(resource.create, dry_run=dry_run)
5✔
334
                delete_func = partial(resource.delete, dry_run=dry_run)
5✔
335

336
            created, patched, deleted, result = self._apply_resource(dry_run,
5✔
337
                                                                     patch_field_excludes,
338
                                                                     resource,
339
                                                                     patch_func,
340
                                                                     create_func,
341
                                                                     delete_func,
342
                                                                     status_msg)
343

344
            total_created += created
5✔
345
            total_patched += patched
5✔
346
            total_deleted += deleted
5✔
347

348
        if ((dump or dry_run) and
5✔
349
                k8s.field_validation_warn_fatal and self.context.globals.k8s.field_validation_warnings):
350
            msg = ("There were %d field validation warnings and the warnings are fatal!" %
5✔
351
                   self.context.globals.k8s.field_validation_warnings)
352
            logger.fatal(msg)
5✔
353
            raise RuntimeError(msg)
5✔
354

355
        if dump:
5✔
356
            file = open(file_name, "w") if file_name else sys.stdout
5✔
357
            try:
5✔
358
                if file_format in ("json", "json-pretty"):
5✔
359
                    json.dump(dump_results, file, sort_keys=True,
5✔
360
                              indent=4 if file_format == "json-pretty" else None)
361
                else:
362
                    yaml.safe_dump(dump_results, file)
5✔
363
            finally:
364
                if file_name:
5✔
365
                    file.close()
5✔
366
        else:
367
            self._summary = total_created, total_patched, total_deleted
5✔
368

369
    def handle_summary(self):
5✔
370
        total_created, total_patched, total_deleted = self._summary
5✔
371
        logger.info("Created %d, patched %d, deleted %d resources", total_created, total_patched, total_deleted)
5✔
372

373
    def api_load_resources(self, path: Path, file_type: str):
5✔
374
        return self.add_local_resources(path, FileType[file_type.upper()])
×
375

376
    def api_load_remote_resources(self, url: str, file_type: str, file_category=None):
5✔
377
        return self.add_remote_resources(url, FileType[file_type.upper()], sub_category=file_category)
×
378

379
    def api_load_crds(self, path: Path, file_type: str):
5✔
380
        return self.add_local_crds(path, FileType[file_type.upper()])
5✔
381

382
    def api_load_remote_crds(self, url: str, file_type: str, file_category=None):
5✔
383
        return self.add_remote_crds(url, FileType[file_type.upper()], sub_category=file_category)
5✔
384

385
    def api_import_cluster_crds(self):
5✔
386
        context = self.context
×
387
        k8s = context.k8s
×
388
        client = k8s.client
×
389
        from kubernetes import client as client_module
×
390

391
        api = client_module.ApiextensionsV1Api(client)
×
392
        crds = api.list_custom_resource_definition(watch=False)
×
393
        for crd in crds.items:
×
394
            manifest = client.sanitize_for_serialization(crd)
×
395
            manifest["apiVersion"] = "apiextensions.k8s.io/v1"
×
396
            manifest["kind"] = "CustomResourceDefinition"
×
397
            self.add_crd(manifest)
×
398

399
    def api_add_transformer(self, transformer):
5✔
400
        if transformer not in self._transformers:
5!
401
            self._transformers.append(transformer)
5✔
402

403
    def api_add_validator(self, validator):
5✔
404
        if validator not in self._validators:
5!
405
            self._validators.append(validator)
5✔
406

407
    def api_add_manifest_patcher(self, patcher):
5✔
408
        if patcher not in self._manifest_patchers:
×
409
            self._manifest_patchers.append(patcher)
×
410

411
    def api_remove_transformer(self, transformer):
5✔
412
        if transformer in self._transformers:
5!
413
            self._transformers.remove(transformer)
5✔
414

415
    def api_remove_validator(self, validator):
5✔
416
        if validator not in self._validators:
×
417
            self._validators.remove(validator)
×
418

419
    def api_validation_error(self, msg, *args):
5✔
420
        frame = sys._getframe().f_back
×
421
        tb = None
×
422
        while True:
×
423
            if not frame:
×
424
                break
×
425
            tb = types.TracebackType(tb, frame, frame.f_lasti, frame.f_lineno)
×
426
            frame = frame.f_back
×
427
        return ValueError((msg % args) if args else msg).with_traceback(tb)
×
428

429
    def _patch_manifest(self,
5✔
430
                        manifest: dict,
431
                        resource_description: str):
432
        for patcher in reversed(self._manifest_patchers):
5!
433
            logger.debug("Applying patcher %s to %s",
×
434
                         getattr(patcher, "__name__", patcher),
435
                         resource_description)
436
            manifest = patcher(manifest, resource_description) or manifest
×
437

438
        return manifest
5✔
439

440
    def _transform_resource(self, resources: Sequence[K8SResource], resource: K8SResource) -> K8SResource:
5✔
441
        for transformer in reversed(self._transformers):
5✔
442
            logger.debug("Applying transformer %s to %s from %s",
5✔
443
                         getattr(transformer, "__name__", transformer),
444
                         resource, resource.source)
445
            resource = transformer(resources, resource) or resource
5✔
446

447
        return resource
5✔
448

449
    def _validate_resources(self):
5✔
450
        errors: list[Exception] = []
5✔
451
        for resource in self.resources.values():
5✔
452
            for validator in reversed(self._validators):
5✔
453
                logger.debug("Applying validator %s to %s from %s",
5✔
454
                             getattr(validator, "__name__", validator),
455
                             resource, resource.source)
456
                errors.extend(validator(self.resources, resource, self.api_validation_error))
5✔
457
        if errors:
5!
458
            for error in errors:
×
459
                logger.error("Validation error: %s", error)
×
460
            raise errors[0]
×
461

462
    def _apply_resource(self,
5✔
463
                        dry_run,
464
                        patch_field_excludes: Iterable[re.compile],
465
                        resource: K8SResource,
466
                        patch_func: Callable[[Iterable[dict]], Optional[dict]],
467
                        create_func: Callable[[], Optional[dict]],
468
                        delete_func: Callable[[K8SPropagationPolicy], None],
469
                        status_msg):
470
        from kubernetes import client
5✔
471
        from kubernetes.client.rest import ApiException
5✔
472

473
        rdef = resource.rdef
5✔
474
        rdef.populate_api(client, self.context.k8s.client)
5✔
475

476
        def handle_400_strict_validation_error(e: ApiException):
5✔
477
            if e.status == 400:
5!
478
                # Assumes the body has been parsed
479
                status = e.body
5✔
480
                if status["status"] == "Failure":
5!
481
                    if FIELD_VALIDATION_STRICT_MARKER in status["message"]:
5!
482
                        message = status["message"]
5✔
483
                        messages = message[message.find(FIELD_VALIDATION_STRICT_MARKER) +
5✔
484
                                           len(FIELD_VALIDATION_STRICT_MARKER):].split(",")
485
                        for m in messages:
5✔
486
                            self._api_warnings(resource, m.strip())
5✔
487

488
                        raise e from None
5✔
489
                    else:
490
                        logger.error("FAILED MODIFYING resource %s from %s: %s",
×
491
                                     resource, resource.source, status["message"])
492
                        raise e from None
×
493

494
        def create(exists_ok=False):
5✔
495
            logger.info("Creating resource %s%s%s", resource, status_msg,
5✔
496
                        " (ignoring existing)" if exists_ok else "")
497
            try:
5✔
498
                return create_func()
5✔
499
            except ApiException as __e:
5✔
500
                if exists_ok and __e.status == 409 and __e.body["reason"] == "AlreadyExists":
5!
501
                    return None
×
502
                raise
5✔
503

504
        merge_instrs, normalized_manifest = extract_merge_instructions(resource.manifest, resource)
5✔
505
        if merge_instrs:
5✔
506
            logger.trace("Normalized manifest (no merge instructions) for resource %s: %s", resource,
5✔
507
                         normalized_manifest)
508
        else:
509
            normalized_manifest = resource.manifest
5✔
510

511
        logger.debug("Applying resource %s%s", resource, status_msg)
5✔
512
        try:
5✔
513
            remote_resource = resource.get()
5✔
514
            logger.trace("Current resource %s: %s", resource, remote_resource)
5✔
515
            # v3 evaluates transition rules here (oldSelf bound to the
516
            # server's current state). v2 has no transition rules and
517
            # the resource was already schema-validated at add_resource
518
            # time, so skip a second pass.
519
            if self.validator.version == "v3":
5✔
520
                transition_errors = list(
5✔
521
                    self.validator.iter_errors(resource.manifest, resource.rdef,
522
                                               old_manifest=remote_resource))
523
                if transition_errors:
5!
NEW
524
                    for err in transition_errors:
×
NEW
525
                        logger.error("Transition rule violation on %s from %s: %s",
×
526
                                     resource, resource.source, err)
NEW
527
                    raise transition_errors[0]
×
528
        except ApiException as e:
5✔
529
            try:
5✔
530
                if e.status == 404:
5!
531
                    try:
5✔
532
                        return 1, 0, 0, create()
5✔
533
                    except ApiException as e:
5✔
534
                        if not handle_400_strict_validation_error(e):
5!
535
                            raise
×
536
                else:
537
                    raise
×
538
            except ApiException as _e:
5✔
539
                api_exc_format_body(_e)
5✔
540
                raise
5✔
541
        else:
542
            while True:
5✔
543
                logger.trace("Attempting to retrieve a normalized patch for resource %s: %s",
5✔
544
                             resource, normalized_manifest)
545
                try:
5✔
546
                    merged_resource = resource.patch(normalized_manifest,
5✔
547
                                                     patch_type=K8SResourcePatchType.SERVER_SIDE_PATCH,
548
                                                     dry_run=True,
549
                                                     force=True)
550
                except ApiException as e:
5✔
551
                    try:
5✔
552
                        if e.status == 422:
5!
553
                            status = e.body
5✔
554
                            # Assumes the body has been unmarshalled
555
                            details = status["details"]
5✔
556
                            immutable_key = details.get("group"), details["kind"]
5✔
557

558
                            try:
5✔
559
                                propagation_policy = self.context.k8s.immutable_changes[immutable_key]
5✔
560
                            except KeyError:
×
561
                                raise e from None
×
562
                            else:
563
                                for cause in details["causes"]:
5!
564
                                    if (
5!
565
                                            cause["reason"] == "FieldValueInvalid" and
566
                                            "field is immutable" in cause["message"]
567
                                            or
568
                                            cause["reason"] == "FieldValueForbidden" and
569
                                            ("Forbidden: updates to" in cause["message"]
570
                                             or
571
                                             "Forbidden: pod updates" in cause["message"])
572
                                    ):
573
                                        logger.info("Deleting resource %s (cascade %s)%s", resource,
5✔
574
                                                    propagation_policy.policy,
575
                                                    status_msg)
576
                                        delete_func(propagation_policy=propagation_policy)
5✔
577
                                        return 1, 0, 1, create(exists_ok=dry_run)
5✔
578
                                raise
×
579
                        else:
580
                            if not handle_400_strict_validation_error(e):
×
581
                                raise
×
582
                    except ApiException as _e:
×
583
                        api_exc_format_body(_e)
×
584
                        raise
×
585

586
                else:
587
                    logger.trace("Merged resource %s: %s", resource, merged_resource)
5✔
588
                    if merge_instrs:
5✔
589
                        apply_merge_instructions(merge_instrs, normalized_manifest, merged_resource, logger, resource)
5✔
590

591
                    patch = jsonpatch.make_patch(remote_resource, merged_resource)
5✔
592

593
                    resource_version = merged_resource["metadata"]["resourceVersion"]
5✔
594
                    resource_uid = merged_resource["metadata"]["uid"]
5✔
595
                    logger.trace("Resource %s adding resourceVersion %s and UID %s tests", resource, resource_version,
5✔
596
                                 resource_uid)
597
                    patch.patch.append({"op": "test", "path": "/metadata/uid", "value": resource_uid})
5✔
598
                    patch.patch.append({"op": "test", "path": "/metadata/resourceVersion", "value": resource_version})
5✔
599

600
                    logger.trace("Resource %s initial patches are: %s", resource, patch)
5✔
601
                    patch = self._filter_resource_patch(patch, patch_field_excludes)
5✔
602
                    logger.trace("Resource %s final patches are: %s", resource, patch)
5✔
603
                    if patch:
5!
604
                        logger.info("Patching resource %s%s", resource, status_msg)
5✔
605
                        try:
5✔
606
                            return 0, 1, 0, patch_func(patch)
5✔
607
                        except ApiException as e:
5✔
608
                            if e.status == 409:
5✔
609
                                logger.warning("Patching resource %s%s encountered a conflict - will retry: \n%s",
5✔
610
                                               resource, status_msg, yaml.dump(e.body))
611
                                continue
5✔
612
                            raise
5✔
613
                    else:
614
                        logger.info("Nothing to patch for resource %s", resource)
×
615
                        return 0, 0, 0, None
×
616

617
    def _filter_resource_patch(self, patch: Iterable[Mapping], excludes: Iterable[re.compile]):
5✔
618
        result = []
5✔
619
        for op in patch:
5✔
620
            if op["op"] != "test":
5✔
621
                path = op["path"]
5✔
622
                excluded = False
5✔
623
                for exclude in excludes:
5✔
624
                    if exclude.match(path):
5✔
625
                        logger.trace("Excluding %r from patch %s", op, patch)
5✔
626
                        excluded = True
5✔
627
                        break
5✔
628
                if excluded:
5✔
629
                    continue
5✔
630
            result.append(op)
5✔
631
        return result
5✔
632

633
    def _setup_k8s_client(self):
5✔
634
        from kubernetes import client
5✔
635
        from kubernetes.config import load_incluster_config, load_kube_config, ConfigException
5✔
636

637
        try:
5✔
638
            logger.debug("Trying K8S in-cluster configuration")
5✔
639
            load_incluster_config()
5✔
640
            logger.info("Running K8S with in-cluster configuration")
×
641
        except ConfigException as e:
5✔
642
            logger.trace("K8S in-cluster configuration failed", exc_info=e)
5✔
643
            logger.debug("Initializing K8S with kubeconfig configuration")
5✔
644
            load_kube_config(config_file=self.context.kubeconfig.kubeconfig)
5✔
645

646
        k8s_client = client.ApiClient()
5✔
647

648
        # Patch the header content type selector to allow json patch
649
        k8s_client._select_header_content_type = k8s_client.select_header_content_type
5✔
650
        k8s_client.select_header_content_type = self._select_header_content_type_patch
5✔
651

652
        return k8s_client
5✔
653

654
    def _select_header_content_type_patch(self, content_types):
5✔
655
        """Returns `Content-Type` based on an array of content_types provided.
656
        :param content_types: List of content-types.
657
        :return: Content-Type (e.g. application/json).
658
        """
659

660
        content_type = self.context.k8s.client._select_header_content_type(content_types)
×
661
        if content_type == "application/merge-patch+json":
×
662
            return "application/json-patch+json"
×
663
        return content_type
×
664

665
    def __repr__(self):
5✔
666
        return "Kubernetes Plugin"
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc