• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 18432987062

11 Oct 2025 06:10PM UTC coverage: 75.397% (+0.06%) from 75.342%
18432987062

push

github

web-flow
Merge pull request #85 from karellen/schema_format_validation_not_none

When validating schema format numbers (int32 etc) disallow None

627 of 983 branches covered (63.78%)

Branch coverage included in aggregate %.

3 of 5 new or added lines in 1 file covered. (60.0%)

6 existing lines in 4 files now uncovered.

2456 of 3106 relevant lines covered (79.07%)

4.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.9
/src/main/python/kubernator/plugins/k8s.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19

20
import json
6✔
21
import logging
6✔
22
import re
6✔
23
import sys
6✔
24
import types
6✔
25
from collections.abc import Mapping
6✔
26
from functools import partial
6✔
27
from importlib.metadata import version as pkg_version
6✔
28
from pathlib import Path
6✔
29
from typing import Iterable, Callable, Sequence
6✔
30

31
import jsonpatch
6✔
32
import yaml
6✔
33

34
from kubernator.api import (KubernatorPlugin,
6✔
35
                            Globs,
36
                            scan_dir,
37
                            load_file,
38
                            FileType,
39
                            load_remote_file,
40
                            StripNL,
41
                            install_python_k8s_client,
42
                            TemplateEngine,
43
                            sleep)
44
from kubernator.merge import extract_merge_instructions, apply_merge_instructions
6✔
45
from kubernator.plugins.k8s_api import (K8SResourcePluginMixin,
6✔
46
                                        K8SResource,
47
                                        K8SResourcePatchType,
48
                                        K8SPropagationPolicy)
49

50
logger = logging.getLogger("kubernator.k8s")
6✔
51
proc_logger = logger.getChild("proc")
6✔
52
stdout_logger = StripNL(proc_logger.info)
6✔
53
stderr_logger = StripNL(proc_logger.warning)
6✔
54

55
FIELD_VALIDATION_STRICT_MARKER = "strict decoding error: "
6✔
56
VALID_FIELD_VALIDATION = ("Ignore", "Warn", "Strict")
6✔
57

58

59
def final_resource_validator(resources: Sequence[K8SResource],
6✔
60
                             resource: K8SResource,
61
                             error: Callable[..., Exception]) -> Iterable[Exception]:
62
    final_key = resource.get_manifest_key(resource.manifest)
6✔
63
    if final_key != resource.key:
6!
64
        yield error("Illegal change of identifiers of the resource "
×
65
                    "%s from %s have been changed to %s",
66
                    resource.key, resource.source, final_key)
67

68
    if resource.rdef.namespaced and not resource.namespace:
6!
69
        yield error("Namespaced resource %s from %s is missing the required namespace",
×
70
                    resource, resource.source)
71

72

73
def normalize_pkg_version(v: str):
6✔
74
    v_split = v.split(".")
6✔
75
    rev = v_split[-1]
6✔
76
    if not rev.isdigit():
6✔
77
        new_rev = ""
6✔
78
        for c in rev:
6!
79
            if not c.isdigit():
6✔
80
                break
6✔
81
            new_rev += c
6✔
82
        v_split[-1] = new_rev
6✔
83
    return tuple(map(int, v_split))
6✔
84

85

86
class KubernetesPlugin(KubernatorPlugin, K8SResourcePluginMixin):
6✔
87
    logger = logger
6✔
88

89
    _name = "k8s"
6✔
90

91
    def __init__(self):
6✔
92
        super().__init__()
6✔
93
        self.context = None
6✔
94

95
        self.embedded_pkg_version = self._get_kubernetes_client_version()
6✔
96

97
        self._transformers = []
6✔
98
        self._validators = []
6✔
99
        self._summary = 0, 0, 0
6✔
100
        self._template_engine = TemplateEngine(logger)
6✔
101

102
    def set_context(self, context):
6✔
103
        self.context = context
6✔
104

105
    def register(self,
6✔
106
                 field_validation="Warn",
107
                 field_validation_warn_fatal=True,
108
                 disable_client_patches=False):
109
        self.context.app.register_plugin("kubeconfig")
6✔
110

111
        if field_validation not in VALID_FIELD_VALIDATION:
6!
112
            raise ValueError("'field_validation' must be one of %s" % (", ".join(VALID_FIELD_VALIDATION)))
×
113

114
        context = self.context
6✔
115
        context.globals.k8s = dict(patch_field_excludes=("^/metadata/managedFields",
6✔
116
                                                         "^/metadata/generation",
117
                                                         "^/metadata/creationTimestamp",
118
                                                         "^/metadata/resourceVersion",
119
                                                         ),
120
                                   immutable_changes={("apps", "DaemonSet"): K8SPropagationPolicy.BACKGROUND,
121
                                                      ("apps", "StatefulSet"): K8SPropagationPolicy.ORPHAN,
122
                                                      ("apps", "Deployment"): K8SPropagationPolicy.ORPHAN,
123
                                                      ("storage.k8s.io", "StorageClass"): K8SPropagationPolicy.ORPHAN,
124
                                                      (None, "Pod"): K8SPropagationPolicy.BACKGROUND,
125
                                                      },
126
                                   default_includes=Globs(["*.yaml", "*.yml"], True),
127
                                   default_excludes=Globs([".*"], True),
128
                                   add_resources=self.add_resources,
129
                                   load_resources=self.api_load_resources,
130
                                   load_remote_resources=self.api_load_remote_resources,
131
                                   load_crds=self.api_load_crds,
132
                                   load_remote_crds=self.api_load_remote_crds,
133
                                   add_transformer=self.api_add_transformer,
134
                                   remove_transformer=self.api_remove_transformer,
135
                                   add_validator=self.api_remove_validator,
136
                                   get_api_versions=self.get_api_versions,
137
                                   create_resource=self.create_resource,
138
                                   disable_client_patches=disable_client_patches,
139
                                   field_validation=field_validation,
140
                                   field_validation_warn_fatal=field_validation_warn_fatal,
141
                                   field_validation_warnings=0,
142
                                   conflict_retry_delay=0.3,
143
                                   _k8s=self,
144
                                   )
145
        context.k8s = dict(default_includes=Globs(context.globals.k8s.default_includes),
6✔
146
                           default_excludes=Globs(context.globals.k8s.default_excludes)
147
                           )
148
        self.api_add_validator(final_resource_validator)
6✔
149

150
    def handle_init(self):
6✔
151
        pass
6✔
152

153
    def handle_start(self):
6✔
154
        self.context.kubeconfig.register_change_notifier(self._kubeconfig_changed)
6✔
155
        self.setup_client()
6✔
156

157
    def _kubeconfig_changed(self):
6✔
158
        self.setup_client()
×
159

160
    def _get_kubernetes_client_version(self):
6✔
161
        return pkg_version("kubernetes").split(".")
6✔
162

163
    def setup_client(self):
6✔
164
        k8s = self.context.k8s
6✔
165
        if "server_version" not in k8s:
6!
166
            self._setup_client()
6✔
167

168
        server_minor = k8s.server_version[1]
6✔
169

170
        logger.info("Using Kubernetes client version =~%s.0 for server version %s",
6✔
171
                    server_minor, ".".join(k8s.server_version))
172
        pkg_dir = install_python_k8s_client(self.context.app.run_passthrough_capturing, server_minor, logger,
6✔
173
                                            stdout_logger, stderr_logger, k8s.disable_client_patches)
174

175
        modules_to_delete = []
6✔
176
        for k, v in sys.modules.items():
6✔
177
            if k == "kubernetes" or k.startswith("kubernetes."):
6✔
178
                modules_to_delete.append(k)
6✔
179
        for k in modules_to_delete:
6✔
180
            del sys.modules[k]
6✔
181

182
        logger.info("Adding sys.path reference to %s", pkg_dir)
6✔
183
        sys.path.insert(0, str(pkg_dir))
6✔
184
        self.embedded_pkg_version = self._get_kubernetes_client_version()
6✔
185
        logger.info("Switching to Kubernetes client version %s", ".".join(self.embedded_pkg_version))
6✔
186
        self._setup_client()
6✔
187

188
        logger.debug("Reading Kubernetes OpenAPI spec for %s", k8s.server_git_version)
6✔
189

190
        k8s_def = load_remote_file(logger, f"https://raw.githubusercontent.com/kubernetes/kubernetes/"
6✔
191
                                           f"{k8s.server_git_version}/api/openapi-spec/swagger.json",
192
                                   FileType.JSON)
193
        self.resource_definitions_schema = k8s_def
6✔
194

195
        self._populate_resource_definitions()
6✔
196

197
    def _setup_client(self):
6✔
198
        from kubernetes import client
6✔
199

200
        context = self.context
6✔
201
        k8s = context.k8s
6✔
202

203
        k8s.client = self._setup_k8s_client()
6✔
204
        version = client.VersionApi(k8s.client).get_code()
6✔
205
        if "-eks-" or "-gke" in version.git_version:
6!
206
            git_version = version.git_version.split("-")[0]
6✔
207
        else:
UNCOV
208
            git_version = version.git_version
209

210
        k8s.server_version = git_version[1:].split(".")
6✔
211
        k8s.server_git_version = git_version
6✔
212

213
        logger.info("Found Kubernetes %s on %s", k8s.server_git_version, k8s.client.configuration.host)
6✔
214

215
        K8SResource._k8s_client_version = normalize_pkg_version(pkg_version("kubernetes"))
6✔
216
        K8SResource._k8s_field_validation = k8s.field_validation
6✔
217
        K8SResource._k8s_field_validation_patched = not k8s.disable_client_patches
6✔
218
        K8SResource._logger = self.logger
6✔
219
        K8SResource._api_warnings = self._api_warnings
6✔
220

221
    def _api_warnings(self, resource, warn):
6✔
222
        k8s = self.context.k8s
6✔
223
        self.context.globals.k8s.field_validation_warnings += 1
6✔
224

225
        log = self.logger.warning
6✔
226
        if k8s.field_validation_warn_fatal:
6✔
227
            log = self.logger.error
6✔
228

229
        log("FAILED FIELD VALIDATION on resource %s from %s: %s", resource, resource.source, warn)
6✔
230

231
    def handle_before_dir(self, cwd: Path):
6✔
232
        context = self.context
6✔
233
        context.k8s.default_includes = Globs(context.k8s.default_includes)
6✔
234
        context.k8s.default_excludes = Globs(context.k8s.default_excludes)
6✔
235
        context.k8s.includes = Globs(context.k8s.default_includes)
6✔
236
        context.k8s.excludes = Globs(context.k8s.default_excludes)
6✔
237

238
    def handle_after_dir(self, cwd: Path):
6✔
239
        context = self.context
6✔
240
        k8s = context.k8s
6✔
241

242
        for f in scan_dir(logger, cwd, lambda d: d.is_file(), k8s.excludes, k8s.includes):
6✔
243
            p = cwd / f.name
6✔
244
            display_p = context.app.display_path(p)
6✔
245
            logger.debug("Adding Kubernetes manifest from %s", display_p)
6✔
246

247
            manifests = load_file(logger, p, FileType.YAML, display_p,
6✔
248
                                  self._template_engine,
249
                                  {"ktor": context}
250
                                  )
251

252
            for manifest in manifests:
6✔
253
                if manifest:
6!
254
                    self.add_resource(manifest, display_p)
6✔
255

256
    def handle_apply(self):
6✔
257
        context = self.context
6✔
258
        k8s = context.k8s
6✔
259

260
        self._validate_resources()
6✔
261

262
        cmd = context.app.args.command
6✔
263
        file = context.app.args.file
6✔
264
        file_format = context.app.args.output_format
6✔
265
        dry_run = context.app.args.dry_run
6✔
266
        dump = cmd == "dump"
6✔
267

268
        status_msg = f"{' (dump only)' if dump else ' (dry run)' if dry_run else ''}"
6✔
269
        if dump:
6✔
270
            logger.info("Will dump the changes into a file %s in %s format", file, file_format)
6✔
271

272
        patch_field_excludes = [re.compile(e) for e in context.globals.k8s.patch_field_excludes]
6✔
273
        dump_results = []
6✔
274
        total_created, total_patched, total_deleted = 0, 0, 0
6✔
275
        for resource in self.resources.values():
6✔
276
            if dump:
6✔
277
                resource_id = {"apiVersion": resource.api_version,
6✔
278
                               "kind": resource.kind,
279
                               "name": resource.name
280
                               }
281

282
                def patch_func(patch):
6✔
283
                    if resource.rdef.namespaced:
6!
284
                        resource_id["namespace"] = resource.namespace
×
285
                    method_descriptor = {"method": "patch",
6✔
286
                                         "resource": resource_id,
287
                                         "body": patch
288
                                         }
289
                    dump_results.append(method_descriptor)
6✔
290

291
                def create_func():
6✔
292
                    method_descriptor = {"method": "create",
6✔
293
                                         "body": resource.manifest}
294
                    dump_results.append(method_descriptor)
6✔
295

296
                def delete_func(*, propagation_policy):
6✔
297
                    method_descriptor = {"method": "delete",
×
298
                                         "resource": resource_id,
299
                                         "propagation_policy": propagation_policy.policy
300
                                         }
301
                    dump_results.append(method_descriptor)
×
302
            else:
303
                patch_func = partial(resource.patch, patch_type=K8SResourcePatchType.JSON_PATCH, dry_run=dry_run)
6✔
304
                create_func = partial(resource.create, dry_run=dry_run)
6✔
305
                delete_func = partial(resource.delete, dry_run=dry_run)
6✔
306

307
            created, patched, deleted = self._apply_resource(dry_run,
6✔
308
                                                             patch_field_excludes,
309
                                                             resource,
310
                                                             patch_func,
311
                                                             create_func,
312
                                                             delete_func,
313
                                                             status_msg)
314

315
            total_created += created
6✔
316
            total_patched += patched
6✔
317
            total_deleted += deleted
6✔
318

319
        if ((dump or dry_run) and
6✔
320
                k8s.field_validation_warn_fatal and self.context.globals.k8s.field_validation_warnings):
321
            msg = ("There were %d field validation warnings and the warnings are fatal!" %
6✔
322
                   self.context.globals.k8s.field_validation_warnings)
323
            logger.fatal(msg)
6✔
324
            raise RuntimeError(msg)
6✔
325

326
        if dump:
6✔
327
            if file_format in ("json", "json-pretty"):
6!
328
                json.dump(dump_results, file, sort_keys=True,
×
329
                          indent=4 if file_format == "json-pretty" else None)
330
            else:
331
                yaml.safe_dump(dump_results, file)
6✔
332
        else:
333
            self._summary = total_created, total_patched, total_deleted
6✔
334

335
    def handle_summary(self):
6✔
336
        total_created, total_patched, total_deleted = self._summary
6✔
337
        logger.info("Created %d, patched %d, deleted %d resources", total_created, total_patched, total_deleted)
6✔
338

339
    def api_load_resources(self, path: Path, file_type: str):
6✔
340
        return self.add_local_resources(path, FileType[file_type.upper()])
×
341

342
    def api_load_remote_resources(self, url: str, file_type: str, file_category=None):
6✔
343
        return self.add_remote_resources(url, FileType[file_type.upper()], sub_category=file_category)
×
344

345
    def api_load_crds(self, path: Path, file_type: str):
6✔
346
        return self.add_local_crds(path, FileType[file_type.upper()])
6✔
347

348
    def api_load_remote_crds(self, url: str, file_type: str, file_category=None):
6✔
349
        return self.add_remote_crds(url, FileType[file_type.upper()], sub_category=file_category)
6✔
350

351
    def api_add_transformer(self, transformer):
6✔
352
        if transformer not in self._transformers:
6!
353
            self._transformers.append(transformer)
6✔
354

355
    def api_add_validator(self, validator):
6✔
356
        if validator not in self._validators:
6!
357
            self._validators.append(validator)
6✔
358

359
    def api_remove_transformer(self, transformer):
6✔
360
        if transformer in self._transformers:
6!
361
            self._transformers.remove(transformer)
6✔
362

363
    def api_remove_validator(self, validator):
6✔
364
        if validator not in self._validators:
×
365
            self._validators.remove(validator)
×
366

367
    def api_validation_error(self, msg, *args):
6✔
368
        frame = sys._getframe().f_back
×
369
        tb = None
×
370
        while True:
371
            if not frame:
×
372
                break
×
373
            tb = types.TracebackType(tb, frame, frame.f_lasti, frame.f_lineno)
×
374
            frame = frame.f_back
×
375
        return ValueError((msg % args) if args else msg).with_traceback(tb)
×
376

377
    def _transform_resource(self, resources: Sequence[K8SResource], resource: K8SResource) -> K8SResource:
6✔
378
        for transformer in reversed(self._transformers):
6✔
379
            logger.debug("Applying transformer %s to %s from %s",
6✔
380
                         getattr(transformer, "__name__", transformer),
381
                         resource, resource.source)
382
            resource = transformer(resources, resource) or resource
6✔
383

384
        return resource
6✔
385

386
    def _validate_resources(self):
6✔
387
        errors: list[Exception] = []
6✔
388
        for resource in self.resources.values():
6✔
389
            for validator in reversed(self._validators):
6✔
390
                logger.debug("Applying validator %s to %s from %s",
6✔
391
                             getattr(validator, "__name__", validator),
392
                             resource, resource.source)
393
                errors.extend(validator(self.resources, resource, self.api_validation_error))
6✔
394
        if errors:
6!
395
            for error in errors:
×
396
                logger.error("Validation error: %s", error)
×
397
            raise errors[0]
×
398

399
    def _apply_resource(self,
6✔
400
                        dry_run,
401
                        patch_field_excludes: Iterable[re.compile],
402
                        resource: K8SResource,
403
                        patch_func: Callable[[Iterable[dict]], None],
404
                        create_func: Callable[[], None],
405
                        delete_func: Callable[[K8SPropagationPolicy], None],
406
                        status_msg):
407
        from kubernetes import client
6✔
408
        from kubernetes.client.rest import ApiException
6✔
409

410
        rdef = resource.rdef
6✔
411
        rdef.populate_api(client, self.context.k8s.client)
6✔
412

413
        def handle_400_strict_validation_error(e: ApiException):
6✔
414
            if e.status == 400:
6!
415
                status = json.loads(e.body)
6✔
416

417
                if status["status"] == "Failure":
6!
418
                    if FIELD_VALIDATION_STRICT_MARKER in status["message"]:
6!
419
                        message = status["message"]
6✔
420
                        messages = message[message.find(FIELD_VALIDATION_STRICT_MARKER) +
6✔
421
                                           len(FIELD_VALIDATION_STRICT_MARKER):].split(",")
422
                        for m in messages:
6✔
423
                            self._api_warnings(resource, m.strip())
6✔
424

425
                        raise e from None
6✔
426
                    else:
427
                        logger.error("FAILED MODIFYING resource %s from %s: %s",
×
428
                                     resource, resource.source, status["message"])
429
                        raise e from None
×
430

431
        def create(exists_ok=False, wait_for_delete=False):
6✔
432
            logger.info("Creating resource %s%s%s", resource, status_msg,
6✔
433
                        " (ignoring existing)" if exists_ok else "")
434
            while True:
5✔
435
                try:
6✔
436
                    create_func()
6✔
437
                    return
6✔
438
                except ApiException as e:
6✔
439
                    if exists_ok or wait_for_delete:
6!
440
                        if e.status == 409:
×
441
                            status = json.loads(e.body)
×
442
                            if status["reason"] == "AlreadyExists":
×
443
                                if wait_for_delete:
×
444
                                    sleep(self.context.k8s.conflict_retry_delay)
×
445
                                    logger.info("Retry creating resource %s%s%s", resource, status_msg,
×
446
                                                " (ignoring existing)" if exists_ok else "")
447
                                    continue
×
448
                                else:
449
                                    return
×
450
                    raise
6✔
451

452
        merge_instrs, normalized_manifest = extract_merge_instructions(resource.manifest, resource)
6✔
453
        if merge_instrs:
6✔
454
            logger.trace("Normalized manifest (no merge instructions) for resource %s: %s", resource,
6✔
455
                         normalized_manifest)
456
        else:
457
            normalized_manifest = resource.manifest
6✔
458

459
        logger.debug("Applying resource %s%s", resource, status_msg)
6✔
460
        try:
6✔
461
            remote_resource = resource.get()
6✔
462
            logger.trace("Current resource %s: %s", resource, remote_resource)
6✔
463
        except ApiException as e:
6✔
464
            if e.status == 404:
6!
465
                try:
6✔
466
                    create()
6✔
467
                    return 1, 0, 0
6✔
468
                except ApiException as e:
6✔
469
                    if not handle_400_strict_validation_error(e):
6✔
470
                        raise
1✔
471

472
            else:
473
                raise
1✔
474
        else:
475
            logger.trace("Attempting to retrieve a normalized patch for resource %s: %s", resource, normalized_manifest)
6✔
476
            try:
6✔
477
                merged_resource = resource.patch(normalized_manifest,
6✔
478
                                                 patch_type=K8SResourcePatchType.SERVER_SIDE_PATCH,
479
                                                 dry_run=True,
480
                                                 force=True)
481
            except ApiException as e:
×
482
                if e.status == 422:
×
483
                    status = json.loads(e.body)
×
484
                    details = status["details"]
×
485
                    immutable_key = details.get("group"), details["kind"]
×
486

487
                    try:
×
488
                        propagation_policy = self.context.k8s.immutable_changes[immutable_key]
×
489
                    except KeyError:
×
490
                        raise e from None
×
491
                    else:
492
                        for cause in details["causes"]:
×
493
                            if (
×
494
                                    cause["reason"] == "FieldValueInvalid" and
495
                                    "field is immutable" in cause["message"]
496
                                    or
497
                                    cause["reason"] == "FieldValueForbidden" and
498
                                    ("Forbidden: updates to" in cause["message"]
499
                                     or
500
                                     "Forbidden: pod updates" in cause["message"])
501
                            ):
502
                                logger.info("Deleting resource %s (cascade %s)%s", resource,
×
503
                                            propagation_policy.policy,
504
                                            status_msg)
505
                                delete_func(propagation_policy=propagation_policy)
×
506
                                create(exists_ok=dry_run, wait_for_delete=not dry_run)
×
507
                                return 1, 0, 1
×
508
                        raise
×
509
                else:
510
                    if not handle_400_strict_validation_error(e):
×
511
                        raise
×
512
            else:
513
                logger.trace("Merged resource %s: %s", resource, merged_resource)
6✔
514
                if merge_instrs:
6✔
515
                    apply_merge_instructions(merge_instrs, normalized_manifest, merged_resource, logger, resource)
6✔
516

517
                patch = jsonpatch.make_patch(remote_resource, merged_resource)
6✔
518
                logger.trace("Resource %s initial patches are: %s", resource, patch)
6✔
519
                patch = self._filter_resource_patch(patch, patch_field_excludes)
6✔
520
                logger.trace("Resource %s final patches are: %s", resource, patch)
6✔
521
                if patch:
6✔
522
                    logger.info("Patching resource %s%s", resource, status_msg)
6✔
523
                    patch_func(patch)
6✔
524
                    return 0, 1, 0
6✔
525
                else:
526
                    logger.info("Nothing to patch for resource %s", resource)
6✔
527
                    return 0, 0, 0
6✔
528

529
    def _filter_resource_patch(self, patch: Iterable[Mapping], excludes: Iterable[re.compile]):
6✔
530
        result = []
6✔
531
        for op in patch:
6✔
532
            path = op["path"]
6✔
533
            excluded = False
6✔
534
            for exclude in excludes:
6✔
535
                if exclude.match(path):
6✔
536
                    logger.trace("Excluding %r from patch %s", op, patch)
6✔
537
                    excluded = True
6✔
538
                    break
6✔
539
            if excluded:
6✔
540
                continue
6✔
541
            result.append(op)
6✔
542
        return result
6✔
543

544
    def _setup_k8s_client(self):
6✔
545
        from kubernetes import client
6✔
546
        from kubernetes.config import load_incluster_config, load_kube_config, ConfigException
6✔
547

548
        try:
6✔
549
            logger.debug("Trying K8S in-cluster configuration")
6✔
550
            load_incluster_config()
6✔
551
            logger.info("Running K8S with in-cluster configuration")
×
552
        except ConfigException as e:
6✔
553
            logger.trace("K8S in-cluster configuration failed", exc_info=e)
6✔
554
            logger.debug("Initializing K8S with kubeconfig configuration")
6✔
555
            load_kube_config(config_file=self.context.kubeconfig.kubeconfig)
6✔
556

557
        k8s_client = client.ApiClient()
6✔
558

559
        # Patch the header content type selector to allow json patch
560
        k8s_client._select_header_content_type = k8s_client.select_header_content_type
6✔
561
        k8s_client.select_header_content_type = self._select_header_content_type_patch
6✔
562

563
        return k8s_client
6✔
564

565
    def _select_header_content_type_patch(self, content_types):
6✔
566
        """Returns `Content-Type` based on an array of content_types provided.
567
        :param content_types: List of content-types.
568
        :return: Content-Type (e.g. application/json).
569
        """
570

571
        content_type = self.context.k8s.client._select_header_content_type(content_types)
×
572
        if content_type == "application/merge-patch+json":
×
573
            return "application/json-patch+json"
×
574
        return content_type
×
575

576
    def __repr__(self):
6✔
577
        return "Kubernetes Plugin"
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc