• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 19252312831

11 Nov 2025 01:54AM UTC coverage: 77.467% (+2.1%) from 75.318%
19252312831

Pull #92

github

web-flow
Merge 34555e628 into 1ca3f8432
Pull Request #92: Add assert_plugin to augment register_plugin to indicate dependency

654 of 1001 branches covered (65.33%)

Branch coverage included in aggregate %.

10 of 17 new or added lines in 3 files covered. (58.82%)

13 existing lines in 2 files now uncovered.

2526 of 3104 relevant lines covered (81.38%)

3.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.79
/src/main/python/kubernator/plugins/k8s.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19

20
import json
4✔
21
import logging
4✔
22
import re
4✔
23
import sys
4✔
24
import types
4✔
25
from collections.abc import Mapping
4✔
26
from functools import partial
4✔
27
from importlib.metadata import version as pkg_version
4✔
28
from pathlib import Path
4✔
29
from typing import Iterable, Callable, Sequence
4✔
30

31
import jsonpatch
4✔
32
import yaml
4✔
33
from kubernetes.client import ApiException
4✔
34

35
from kubernator.api import (KubernatorPlugin,
4✔
36
                            Globs,
37
                            scan_dir,
38
                            load_file,
39
                            FileType,
40
                            load_remote_file,
41
                            StripNL,
42
                            install_python_k8s_client,
43
                            TemplateEngine,
44
                            sleep)
45
from kubernator.merge import extract_merge_instructions, apply_merge_instructions
4✔
46
from kubernator.plugins.k8s_api import (K8SResourcePluginMixin,
4✔
47
                                        K8SResource,
48
                                        K8SResourcePatchType,
49
                                        K8SPropagationPolicy)
50

51
logger = logging.getLogger("kubernator.k8s")
4✔
52
proc_logger = logger.getChild("proc")
4✔
53
stdout_logger = StripNL(proc_logger.info)
4✔
54
stderr_logger = StripNL(proc_logger.warning)
4✔
55

56
FIELD_VALIDATION_STRICT_MARKER = "strict decoding error: "
4✔
57
VALID_FIELD_VALIDATION = ("Ignore", "Warn", "Strict")
4✔
58

59

60
def final_resource_validator(resources: Sequence[K8SResource],
4✔
61
                             resource: K8SResource,
62
                             error: Callable[..., Exception]) -> Iterable[Exception]:
63
    final_key = resource.get_manifest_key(resource.manifest)
4✔
64
    if final_key != resource.key:
4!
65
        yield error("Illegal change of identifiers of the resource "
×
66
                    "%s from %s have been changed to %s",
67
                    resource.key, resource.source, final_key)
68

69
    if resource.rdef.namespaced and not resource.namespace:
4!
70
        yield error("Namespaced resource %s from %s is missing the required namespace",
×
71
                    resource, resource.source)
72

73

74
def normalize_pkg_version(v: str):
4✔
75
    v_split = v.split(".")
4✔
76
    rev = v_split[-1]
4✔
77
    if not rev.isdigit():
4✔
78
        new_rev = ""
4✔
79
        for c in rev:
4!
80
            if not c.isdigit():
4✔
81
                break
4✔
82
            new_rev += c
4✔
83
        v_split[-1] = new_rev
4✔
84
    return tuple(map(int, v_split))
4✔
85

86

87
def api_exc_normalize_body(e: "ApiException"):
4✔
88
    if e.headers and "content-type" in e.headers:
4!
89
        content_type = e.headers["content-type"]
4✔
90
        if content_type == "application/json" or content_type.endswith("+json"):
4!
91
            e.body = json.loads(e.body)
4✔
92
        elif (content_type in ("application/yaml", "application/x-yaml", "text/yaml",
×
93
                               "text/x-yaml") or content_type.endswith("+yaml")):
94
            e.body = yaml.safe_load(e.body)
×
95

96

97
def api_exc_format_body(e: ApiException):
4✔
98
    if not isinstance(e.body, (str, bytes)):
4!
99
        e.body = json.dumps(e.body, indent=4)
4✔
100

101

102
class KubernetesPlugin(KubernatorPlugin, K8SResourcePluginMixin):
4✔
103
    logger = logger
4✔
104

105
    _name = "k8s"
4✔
106

107
    def __init__(self):
4✔
108
        super().__init__()
4✔
109
        self.context = None
4✔
110

111
        self.embedded_pkg_version = self._get_kubernetes_client_version()
4✔
112

113
        self._transformers = []
4✔
114
        self._validators = []
4✔
115
        self._manifest_patchers = []
4✔
116
        self._summary = 0, 0, 0
4✔
117
        self._template_engine = TemplateEngine(logger)
4✔
118

119
    def set_context(self, context):
4✔
120
        self.context = context
4✔
121

122
    def register(self,
4✔
123
                 field_validation="Warn",
124
                 field_validation_warn_fatal=True,
125
                 disable_client_patches=False):
126
        self.context.app.register_plugin("kubeconfig")
4✔
127

128
        if field_validation not in VALID_FIELD_VALIDATION:
4!
129
            raise ValueError("'field_validation' must be one of %s" % (", ".join(VALID_FIELD_VALIDATION)))
×
130

131
        context = self.context
4✔
132
        context.globals.k8s = dict(patch_field_excludes=("^/metadata/managedFields",
4✔
133
                                                         "^/metadata/generation",
134
                                                         "^/metadata/creationTimestamp",
135
                                                         "^/metadata/resourceVersion",
136
                                                         ),
137
                                   immutable_changes={("apps", "DaemonSet"): K8SPropagationPolicy.BACKGROUND,
138
                                                      ("apps", "StatefulSet"): K8SPropagationPolicy.ORPHAN,
139
                                                      ("apps", "Deployment"): K8SPropagationPolicy.ORPHAN,
140
                                                      ("storage.k8s.io", "StorageClass"): K8SPropagationPolicy.ORPHAN,
141
                                                      (None, "Pod"): K8SPropagationPolicy.BACKGROUND,
142
                                                      ("batch", "Job"): K8SPropagationPolicy.ORPHAN,
143
                                                      },
144
                                   default_includes=Globs(["*.yaml", "*.yml"], True),
145
                                   default_excludes=Globs([".*"], True),
146
                                   add_resources=self.add_resources,
147
                                   load_resources=self.api_load_resources,
148
                                   load_remote_resources=self.api_load_remote_resources,
149
                                   load_crds=self.api_load_crds,
150
                                   load_remote_crds=self.api_load_remote_crds,
151
                                   add_transformer=self.api_add_transformer,
152
                                   remove_transformer=self.api_remove_transformer,
153
                                   add_validator=self.api_remove_validator,
154
                                   add_manifest_patcher=self.api_add_manifest_patcher,
155
                                   get_api_versions=self.get_api_versions,
156
                                   create_resource=self.create_resource,
157
                                   disable_client_patches=disable_client_patches,
158
                                   field_validation=field_validation,
159
                                   field_validation_warn_fatal=field_validation_warn_fatal,
160
                                   field_validation_warnings=0,
161
                                   conflict_retry_delay=0.3,
162
                                   _k8s=self,
163
                                   )
164
        context.k8s = dict(default_includes=Globs(context.globals.k8s.default_includes),
4✔
165
                           default_excludes=Globs(context.globals.k8s.default_excludes)
166
                           )
167
        self.api_add_validator(final_resource_validator)
4✔
168

169
    def handle_init(self):
4✔
170
        pass
4✔
171

172
    def handle_start(self):
4✔
173
        self.context.kubeconfig.register_change_notifier(self._kubeconfig_changed)
4✔
174
        self.setup_client()
4✔
175

176
    def _kubeconfig_changed(self):
4✔
177
        self.setup_client()
×
178

179
    def _get_kubernetes_client_version(self):
4✔
180
        return pkg_version("kubernetes").split(".")
4✔
181

182
    def setup_client(self):
4✔
183
        k8s = self.context.k8s
4✔
184
        if "server_version" not in k8s:
4!
185
            self._setup_client()
4✔
186

187
        server_minor = k8s.server_version[1]
4✔
188

189
        logger.info("Using Kubernetes client version =~%s.0 for server version %s",
4✔
190
                    server_minor, ".".join(k8s.server_version))
191
        pkg_dir = install_python_k8s_client(self.context.app.run_passthrough_capturing, server_minor, logger,
4✔
192
                                            stdout_logger, stderr_logger, k8s.disable_client_patches)
193

194
        modules_to_delete = []
4✔
195
        for k, v in sys.modules.items():
4✔
196
            if k == "kubernetes" or k.startswith("kubernetes."):
4✔
197
                modules_to_delete.append(k)
4✔
198
        for k in modules_to_delete:
4✔
199
            del sys.modules[k]
4✔
200

201
        logger.info("Adding sys.path reference to %s", pkg_dir)
4✔
202
        sys.path.insert(0, str(pkg_dir))
4✔
203
        self.embedded_pkg_version = self._get_kubernetes_client_version()
4✔
204
        logger.info("Switching to Kubernetes client version %s", ".".join(self.embedded_pkg_version))
4✔
205
        self._setup_client()
4✔
206

207
        logger.debug("Reading Kubernetes OpenAPI spec for %s", k8s.server_git_version)
4✔
208

209
        k8s_def = load_remote_file(logger, f"https://raw.githubusercontent.com/kubernetes/kubernetes/"
4✔
210
                                           f"{k8s.server_git_version}/api/openapi-spec/swagger.json",
211
                                   FileType.JSON)
212
        self.resource_definitions_schema = k8s_def
4✔
213

214
        self._populate_resource_definitions()
4✔
215

216
    def _setup_client(self):
4✔
217
        from kubernetes import client
4✔
218

219
        context = self.context
4✔
220
        k8s = context.k8s
4✔
221

222
        k8s.client = self._setup_k8s_client()
4✔
223
        version = client.VersionApi(k8s.client).get_code()
4✔
224
        if "-eks-" or "-gke" in version.git_version:
4!
225
            git_version = version.git_version.split("-")[0]
4✔
226
        else:
227
            git_version = version.git_version
228

229
        k8s.server_version = git_version[1:].split(".")
4✔
230
        k8s.server_git_version = git_version
4✔
231

232
        logger.info("Found Kubernetes %s on %s", k8s.server_git_version, k8s.client.configuration.host)
4✔
233

234
        K8SResource._k8s_client_version = normalize_pkg_version(pkg_version("kubernetes"))
4✔
235
        K8SResource._k8s_field_validation = k8s.field_validation
4✔
236
        K8SResource._k8s_field_validation_patched = not k8s.disable_client_patches
4✔
237
        K8SResource._logger = self.logger
4✔
238
        K8SResource._api_warnings = self._api_warnings
4✔
239

240
    def _api_warnings(self, resource, warn):
4✔
241
        k8s = self.context.k8s
4✔
242
        self.context.globals.k8s.field_validation_warnings += 1
4✔
243

244
        log = self.logger.warning
4✔
245
        if k8s.field_validation_warn_fatal:
4✔
246
            log = self.logger.error
4✔
247

248
        log("FAILED FIELD VALIDATION on resource %s from %s: %s", resource, resource.source, warn)
4✔
249

250
    def handle_before_dir(self, cwd: Path):
4✔
251
        context = self.context
4✔
252
        context.k8s.default_includes = Globs(context.k8s.default_includes)
4✔
253
        context.k8s.default_excludes = Globs(context.k8s.default_excludes)
4✔
254
        context.k8s.includes = Globs(context.k8s.default_includes)
4✔
255
        context.k8s.excludes = Globs(context.k8s.default_excludes)
4✔
256

257
    def handle_after_dir(self, cwd: Path):
4✔
258
        context = self.context
4✔
259
        k8s = context.k8s
4✔
260

261
        for f in scan_dir(logger, cwd, lambda d: d.is_file(), k8s.excludes, k8s.includes):
4✔
262
            p = cwd / f.name
4✔
263
            display_p = context.app.display_path(p)
4✔
264
            logger.debug("Adding Kubernetes manifest from %s", display_p)
4✔
265

266
            manifests = load_file(logger, p, FileType.YAML, display_p,
4✔
267
                                  self._template_engine,
268
                                  {"ktor": context}
269
                                  )
270

271
            for manifest in manifests:
4✔
272
                if manifest:
4!
273
                    self.add_resource(manifest, display_p)
4✔
274

275
    def handle_apply(self):
4✔
276
        context = self.context
4✔
277
        k8s = context.k8s
4✔
278

279
        self._validate_resources()
4✔
280

281
        cmd = context.app.args.command
4✔
282
        file = context.app.args.file
4✔
283
        file_format = context.app.args.output_format
4✔
284
        dry_run = context.app.args.dry_run
4✔
285
        dump = cmd == "dump"
4✔
286

287
        status_msg = f"{' (dump only)' if dump else ' (dry run)' if dry_run else ''}"
4✔
288
        if dump:
4✔
289
            logger.info("Will dump the changes into a file %s in %s format", file, file_format)
4✔
290

291
        patch_field_excludes = [re.compile(e) for e in context.globals.k8s.patch_field_excludes]
4✔
292
        dump_results = []
4✔
293
        total_created, total_patched, total_deleted = 0, 0, 0
4✔
294
        for resource in self.resources.values():
4✔
295
            if dump:
4✔
296
                resource_id = {"apiVersion": resource.api_version,
4✔
297
                               "kind": resource.kind,
298
                               "name": resource.name
299
                               }
300

301
                def patch_func(patch):
4✔
302
                    if resource.rdef.namespaced:
4!
303
                        resource_id["namespace"] = resource.namespace
×
304
                    method_descriptor = {"method": "patch",
4✔
305
                                         "resource": resource_id,
306
                                         "body": patch
307
                                         }
308
                    dump_results.append(method_descriptor)
4✔
309

310
                def create_func():
4✔
311
                    method_descriptor = {"method": "create",
4✔
312
                                         "body": resource.manifest}
313
                    dump_results.append(method_descriptor)
4✔
314

315
                def delete_func(*, propagation_policy):
4✔
316
                    method_descriptor = {"method": "delete",
×
317
                                         "resource": resource_id,
318
                                         "propagation_policy": propagation_policy.policy
319
                                         }
320
                    dump_results.append(method_descriptor)
×
321
            else:
322
                patch_func = partial(resource.patch, patch_type=K8SResourcePatchType.JSON_PATCH, dry_run=dry_run)
4✔
323
                create_func = partial(resource.create, dry_run=dry_run)
4✔
324
                delete_func = partial(resource.delete, dry_run=dry_run)
4✔
325

326
            created, patched, deleted = self._apply_resource(dry_run,
4✔
327
                                                             patch_field_excludes,
328
                                                             resource,
329
                                                             patch_func,
330
                                                             create_func,
331
                                                             delete_func,
332
                                                             status_msg)
333

334
            total_created += created
4✔
335
            total_patched += patched
4✔
336
            total_deleted += deleted
4✔
337

338
        if ((dump or dry_run) and
4✔
339
                k8s.field_validation_warn_fatal and self.context.globals.k8s.field_validation_warnings):
340
            msg = ("There were %d field validation warnings and the warnings are fatal!" %
4✔
341
                   self.context.globals.k8s.field_validation_warnings)
342
            logger.fatal(msg)
4✔
343
            raise RuntimeError(msg)
4✔
344

345
        if dump:
4✔
346
            if file_format in ("json", "json-pretty"):
4!
347
                json.dump(dump_results, file, sort_keys=True,
×
348
                          indent=4 if file_format == "json-pretty" else None)
349
            else:
350
                yaml.safe_dump(dump_results, file)
4✔
351
        else:
352
            self._summary = total_created, total_patched, total_deleted
4✔
353

354
    def handle_summary(self):
4✔
355
        total_created, total_patched, total_deleted = self._summary
4✔
356
        logger.info("Created %d, patched %d, deleted %d resources", total_created, total_patched, total_deleted)
4✔
357

358
    def api_load_resources(self, path: Path, file_type: str):
4✔
359
        return self.add_local_resources(path, FileType[file_type.upper()])
×
360

361
    def api_load_remote_resources(self, url: str, file_type: str, file_category=None):
4✔
362
        return self.add_remote_resources(url, FileType[file_type.upper()], sub_category=file_category)
×
363

364
    def api_load_crds(self, path: Path, file_type: str):
4✔
365
        return self.add_local_crds(path, FileType[file_type.upper()])
4✔
366

367
    def api_load_remote_crds(self, url: str, file_type: str, file_category=None):
4✔
368
        return self.add_remote_crds(url, FileType[file_type.upper()], sub_category=file_category)
4✔
369

370
    def api_add_transformer(self, transformer):
4✔
371
        if transformer not in self._transformers:
4!
372
            self._transformers.append(transformer)
4✔
373

374
    def api_add_validator(self, validator):
4✔
375
        if validator not in self._validators:
4!
376
            self._validators.append(validator)
4✔
377

378
    def api_add_manifest_patcher(self, patcher):
4✔
379
        if patcher not in self._manifest_patchers:
×
380
            self._manifest_patchers.append(patcher)
×
381

382
    def api_remove_transformer(self, transformer):
4✔
383
        if transformer in self._transformers:
4!
384
            self._transformers.remove(transformer)
4✔
385

386
    def api_remove_validator(self, validator):
4✔
387
        if validator not in self._validators:
×
388
            self._validators.remove(validator)
×
389

390
    def api_validation_error(self, msg, *args):
4✔
391
        frame = sys._getframe().f_back
×
392
        tb = None
×
UNCOV
393
        while True:
394
            if not frame:
×
395
                break
×
396
            tb = types.TracebackType(tb, frame, frame.f_lasti, frame.f_lineno)
×
397
            frame = frame.f_back
×
398
        return ValueError((msg % args) if args else msg).with_traceback(tb)
×
399

400
    def _patch_manifest(self,
4✔
401
                        manifest: dict,
402
                        resource_description: str):
403
        for patcher in reversed(self._manifest_patchers):
4!
404
            logger.debug("Applying patcher %s to %s",
×
405
                         getattr(patcher, "__name__", patcher),
406
                         resource_description)
407
            manifest = patcher(manifest, resource_description) or manifest
×
408

409
        return manifest
4✔
410

411
    def _transform_resource(self, resources: Sequence[K8SResource], resource: K8SResource) -> K8SResource:
4✔
412
        for transformer in reversed(self._transformers):
4✔
413
            logger.debug("Applying transformer %s to %s from %s",
4✔
414
                         getattr(transformer, "__name__", transformer),
415
                         resource, resource.source)
416
            resource = transformer(resources, resource) or resource
4✔
417

418
        return resource
4✔
419

420
    def _validate_resources(self):
4✔
421
        errors: list[Exception] = []
4✔
422
        for resource in self.resources.values():
4✔
423
            for validator in reversed(self._validators):
4✔
424
                logger.debug("Applying validator %s to %s from %s",
4✔
425
                             getattr(validator, "__name__", validator),
426
                             resource, resource.source)
427
                errors.extend(validator(self.resources, resource, self.api_validation_error))
4✔
428
        if errors:
4!
429
            for error in errors:
×
430
                logger.error("Validation error: %s", error)
×
431
            raise errors[0]
×
432

433
    def _apply_resource(self,
4✔
434
                        dry_run,
435
                        patch_field_excludes: Iterable[re.compile],
436
                        resource: K8SResource,
437
                        patch_func: Callable[[Iterable[dict]], None],
438
                        create_func: Callable[[], None],
439
                        delete_func: Callable[[K8SPropagationPolicy], None],
440
                        status_msg):
441
        from kubernetes import client
4✔
442
        from kubernetes.client.rest import ApiException
4✔
443

444
        rdef = resource.rdef
4✔
445
        rdef.populate_api(client, self.context.k8s.client)
4✔
446

447
        def handle_400_strict_validation_error(e: ApiException):
4✔
448
            if e.status == 400:
4!
449
                # Assumes the body has been parsed
450
                status = e.body
4✔
451
                if status["status"] == "Failure":
4!
452
                    if FIELD_VALIDATION_STRICT_MARKER in status["message"]:
4!
453
                        message = status["message"]
4✔
454
                        messages = message[message.find(FIELD_VALIDATION_STRICT_MARKER) +
4✔
455
                                           len(FIELD_VALIDATION_STRICT_MARKER):].split(",")
456
                        for m in messages:
4✔
457
                            self._api_warnings(resource, m.strip())
4✔
458

459
                        raise e from None
4✔
460
                    else:
461
                        logger.error("FAILED MODIFYING resource %s from %s: %s",
×
462
                                     resource, resource.source, status["message"])
463
                        raise e from None
×
464

465
        def create(exists_ok=False, wait_for_delete=False):
4✔
466
            logger.info("Creating resource %s%s%s", resource, status_msg,
4✔
467
                        " (ignoring existing)" if exists_ok else "")
468
            while True:
3✔
469
                try:
4✔
470
                    create_func()
4✔
471
                    return
4✔
472
                except ApiException as __e:
4✔
473
                    api_exc_normalize_body(__e)
4✔
474
                    try:
4✔
475
                        if exists_ok or wait_for_delete:
4!
476
                            if __e.status == 409:
×
477
                                status = __e.body
×
478
                                if status["reason"] == "AlreadyExists":
×
479
                                    if wait_for_delete:
×
480
                                        sleep(self.context.k8s.conflict_retry_delay)
×
481
                                        logger.info("Retry creating resource %s%s%s", resource, status_msg,
×
482
                                                    " (ignoring existing)" if exists_ok else "")
483
                                        continue
×
484
                                    else:
485
                                        return
×
486
                        raise
4✔
487
                    except ApiException as ___e:
4✔
488
                        api_exc_format_body(___e)
4✔
489
                        raise
4✔
490

491
        merge_instrs, normalized_manifest = extract_merge_instructions(resource.manifest, resource)
4✔
492
        if merge_instrs:
4✔
493
            logger.trace("Normalized manifest (no merge instructions) for resource %s: %s", resource,
4✔
494
                         normalized_manifest)
495
        else:
496
            normalized_manifest = resource.manifest
4✔
497

498
        logger.debug("Applying resource %s%s", resource, status_msg)
4✔
499
        try:
4✔
500
            remote_resource = resource.get()
4✔
501
            logger.trace("Current resource %s: %s", resource, remote_resource)
4✔
502
        except ApiException as e:
4✔
503
            api_exc_normalize_body(e)
4✔
504
            try:
4✔
505
                if e.status == 404:
4!
506
                    try:
4✔
507
                        create()
4✔
508
                        return 1, 0, 0
4✔
509
                    except ApiException as e:
4✔
510
                        api_exc_normalize_body(e)
4✔
511
                        if not handle_400_strict_validation_error(e):
4!
UNCOV
512
                            raise
1✔
513
                else:
514
                    raise
×
515
            except ApiException as _e:
4✔
516
                api_exc_format_body(_e)
4✔
517
                raise
4✔
518
        else:
519
            logger.trace("Attempting to retrieve a normalized patch for resource %s: %s", resource, normalized_manifest)
4✔
520
            try:
4✔
521
                merged_resource = resource.patch(normalized_manifest,
4✔
522
                                                 patch_type=K8SResourcePatchType.SERVER_SIDE_PATCH,
523
                                                 dry_run=True,
524
                                                 force=True)
525
            except ApiException as e:
×
526
                try:
×
527
                    api_exc_normalize_body(e)
×
528

529
                    if e.status == 422:
×
530
                        status = e.body
×
531
                        # Assumes the body has been unmarshalled
532
                        details = status["details"]
×
533
                        immutable_key = details.get("group"), details["kind"]
×
534

535
                        try:
×
536
                            propagation_policy = self.context.k8s.immutable_changes[immutable_key]
×
537
                        except KeyError:
×
538
                            raise e from None
×
539
                        else:
540
                            for cause in details["causes"]:
×
541
                                if (
×
542
                                        cause["reason"] == "FieldValueInvalid" and
543
                                        "field is immutable" in cause["message"]
544
                                        or
545
                                        cause["reason"] == "FieldValueForbidden" and
546
                                        ("Forbidden: updates to" in cause["message"]
547
                                         or
548
                                         "Forbidden: pod updates" in cause["message"])
549
                                ):
550
                                    logger.info("Deleting resource %s (cascade %s)%s", resource,
×
551
                                                propagation_policy.policy,
552
                                                status_msg)
553
                                    delete_func(propagation_policy=propagation_policy)
×
554
                                    create(exists_ok=dry_run, wait_for_delete=not dry_run)
×
555
                                    return 1, 0, 1
×
556
                            raise
×
557
                    else:
558
                        if not handle_400_strict_validation_error(e):
×
559
                            raise
×
560
                except ApiException as _e:
×
561
                    api_exc_format_body(_e)
×
562
                    raise
×
563

564
            else:
565
                logger.trace("Merged resource %s: %s", resource, merged_resource)
4✔
566
                if merge_instrs:
4✔
567
                    apply_merge_instructions(merge_instrs, normalized_manifest, merged_resource, logger, resource)
4✔
568

569
                patch = jsonpatch.make_patch(remote_resource, merged_resource)
4✔
570
                logger.trace("Resource %s initial patches are: %s", resource, patch)
4✔
571
                patch = self._filter_resource_patch(patch, patch_field_excludes)
4✔
572
                logger.trace("Resource %s final patches are: %s", resource, patch)
4✔
573
                if patch:
4✔
574
                    logger.info("Patching resource %s%s", resource, status_msg)
4✔
575
                    patch_func(patch)
4✔
576
                    return 0, 1, 0
4✔
577
                else:
578
                    logger.info("Nothing to patch for resource %s", resource)
4✔
579
                    return 0, 0, 0
4✔
580

581
    def _filter_resource_patch(self, patch: Iterable[Mapping], excludes: Iterable[re.compile]):
4✔
582
        result = []
4✔
583
        for op in patch:
4✔
584
            path = op["path"]
4✔
585
            excluded = False
4✔
586
            for exclude in excludes:
4✔
587
                if exclude.match(path):
4✔
588
                    logger.trace("Excluding %r from patch %s", op, patch)
4✔
589
                    excluded = True
4✔
590
                    break
4✔
591
            if excluded:
4✔
592
                continue
4✔
593
            result.append(op)
4✔
594
        return result
4✔
595

596
    def _setup_k8s_client(self):
4✔
597
        from kubernetes import client
4✔
598
        from kubernetes.config import load_incluster_config, load_kube_config, ConfigException
4✔
599

600
        try:
4✔
601
            logger.debug("Trying K8S in-cluster configuration")
4✔
602
            load_incluster_config()
4✔
603
            logger.info("Running K8S with in-cluster configuration")
×
604
        except ConfigException as e:
4✔
605
            logger.trace("K8S in-cluster configuration failed", exc_info=e)
4✔
606
            logger.debug("Initializing K8S with kubeconfig configuration")
4✔
607
            load_kube_config(config_file=self.context.kubeconfig.kubeconfig)
4✔
608

609
        k8s_client = client.ApiClient()
4✔
610

611
        # Patch the header content type selector to allow json patch
612
        k8s_client._select_header_content_type = k8s_client.select_header_content_type
4✔
613
        k8s_client.select_header_content_type = self._select_header_content_type_patch
4✔
614

615
        return k8s_client
4✔
616

617
    def _select_header_content_type_patch(self, content_types):
4✔
618
        """Returns `Content-Type` based on an array of content_types provided.
619
        :param content_types: List of content-types.
620
        :return: Content-Type (e.g. application/json).
621
        """
622

623
        content_type = self.context.k8s.client._select_header_content_type(content_types)
×
624
        if content_type == "application/merge-patch+json":
×
625
            return "application/json-patch+json"
×
626
        return content_type
×
627

628
    def __repr__(self):
4✔
629
        return "Kubernetes Plugin"
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc