• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24580658204

17 Apr 2026 06:29PM UTC coverage: 77.939% (+1.0%) from 76.951%
24580658204

push

github

web-flow
Refactor k8s apply: SSA test-ops, 409 retry, watch-based delete, imperative API (#102)

## Summary

- Harden the K8s apply path with SSA-derived JSON-patch `test` ops for
`/metadata/uid` and `/metadata/resourceVersion`, and wrap the SSA patch
branch in a 409-retry loop that recomputes the patch on conflict.
- Extend `K8SResource` with `list()`, `watch()`, `delete(wait=True)`
that uses watch + `get()` recheck, and add a `resource_generator()`
extension seam on `KubernetesPlugin`.
- Add `ktor.k8s.resource(manifest)` — a fully-wired `K8SResource`
factory for imperative CRUD from `.kubernator.py` scripts, bypassing the
declarative apply lifecycle.
- Thread the applied manifest as a 4th return from `_apply_resource` and
its inner patch/create/delete callables for external consumers.
- Move dump-file open/close from `app.py` into
`KubernetesPlugin.handle_apply`; fix dump-mode `patch_func` to seed
uid/RV on a copy of the local manifest by extracting them from the
patch's own test ops (via `jsonpointer`), so the simulated post-patch
manifest can be returned honestly.
- Rewrite `delete_create` and `resource_version_merge` integration tests
as single-phase using the new imperative API, replacing `kubectl`-based
phase-1 setup and the `TEST_PHASE` env re-invocation.
- Add unit test for 409 retry and integration tests for `watch()` and
`resource_generator`. Port `resource_version_merge` from minikube to
kind.

## Test plan

- [x] `pyb -vX run_unit_tests` (32 tests, all green — includes new
`k8s_apply_retry_tests`)
- [x] `pyb -vX run_integration_tests -P
integrationtest_file_glob=delete_create_tests.py` (single-phase, kind,
immutable-field delete+recreate path)
- [x] `pyb -vX run_integration_tests -P
integrationtest_file_glob=resource_version_merge_tests.py`
(single-phase, kind, dump mode patch verification)
- [ ] Full `pyb -vX run_integration_tests` suite via CI across the
Python 3.10–3.14 matrix

614 of 976 branches covered (62.91%)

Branch coverage included in aggregate %.

137 of 163 new or added lines in 4 files covered. (84.05%)

5 existing lines in 3 files now uncovered.

3039 of 3711 relevant lines covered (81.89%)

4.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.84
/src/main/python/kubernator/plugins/istio.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import json
5✔
20
import logging
5✔
21
import os
5✔
22
import tarfile
5✔
23
import tempfile
5✔
24
from pathlib import Path
5✔
25
from shutil import which
5✔
26

27
import yaml
5✔
28

29
from kubernator.api import (KubernatorPlugin, scan_dir,
5✔
30
                            TemplateEngine,
31
                            load_remote_file,
32
                            FileType,
33
                            StripNL,
34
                            Globs,
35
                            get_golang_os,
36
                            get_golang_machine,
37
                            prepend_os_path, jp, load_file)
38
from kubernator.plugins.k8s_api import api_exc_format_body
5✔
39
from kubernator.plugins.k8s_api import K8SResourcePluginMixin
5✔
40

41
logger = logging.getLogger("kubernator.istio")
5✔
42
proc_logger = logger.getChild("proc")
5✔
43
stdout_logger = StripNL(proc_logger.info)
5✔
44
stderr_logger = StripNL(proc_logger.warning)
5✔
45

46
MESH_PILOT_JP = jp('$.meshVersion[?Component="pilot"].Info.version')
5✔
47

48

49
class IstioPlugin(KubernatorPlugin, K8SResourcePluginMixin):
5✔
50
    logger = logger
5✔
51
    _name = "istio"
5✔
52

53
    def __init__(self):
5✔
54
        self.context = None
5✔
55
        self.client_version = None
5✔
56
        self.server_version = None
5✔
57
        self.provision_operator = False
5✔
58
        self.install = False
5✔
59
        self.upgrade = False
5✔
60
        self.upgrade_from_operator = False
5✔
61
        self.template_engine = TemplateEngine(logger)
5✔
62

63
        self.istioctl_dir = None
5✔
64

65
        super().__init__()
5✔
66

67
    def register(self, version=None):
5✔
68
        context = self.context
5✔
69
        context.app.register_plugin("kubeconfig")
5✔
70
        context.app.assert_plugin("k8s", self)
5✔
71

72
        if version:
5!
73
            # Download and use specific version
74
            istioctl_os = get_golang_os()
5✔
75
            if istioctl_os == "darwin":
5!
76
                istioctl_os = "osx"
×
77
                istioctl_machine = get_golang_machine()
×
78
                if istioctl_machine == "amd64":
×
79
                    istioctl_platform = istioctl_os
×
80
                else:
81
                    istioctl_platform = f"{istioctl_os}-{istioctl_machine}"
×
82
            else:
83
                istioctl_platform = f"{istioctl_os}-{get_golang_machine()}"
5✔
84
            istioctl_url = (f"https://github.com/istio/istio/releases/download/{version}/"
5✔
85
                            f"istioctl-{version}-{istioctl_platform}.tar.gz")
86
            istioctl_file_dl, _ = context.app.download_remote_file(logger, istioctl_url, "bin")
5✔
87
            istioctl_file_dl = str(istioctl_file_dl)
5✔
88
            self.istioctl_dir = tempfile.TemporaryDirectory()
5✔
89
            context.app.register_cleanup(self.istioctl_dir)
5✔
90

91
            istioctl_file = str(Path(self.istioctl_dir.name) / "istioctl")
5✔
92
            istio_tar = tarfile.open(istioctl_file_dl)
5✔
93
            istio_tar.extractall(self.istioctl_dir.name)
5✔
94

95
            os.chmod(istioctl_file, 0o500)
5✔
96
            prepend_os_path(self.istioctl_dir.name)
5✔
97
        else:
98
            # Use current version
99
            istioctl_file = which("istioctl")
×
100
            if not istioctl_file:
×
101
                raise RuntimeError("`istioctl` cannot be found and no version has been specified")
×
102

103
            logger.debug("Found istioctl in %r", istioctl_file)
×
104

105
        context.globals.istio = dict(
5✔
106
            default_includes=Globs(["*.istio.yaml", "*.istio.yml"], True),
107
            default_excludes=Globs([".*"], True),
108
            istioctl_file=istioctl_file,
109
            stanza=self.stanza,
110
            test=self.test_istioctl
111
        )
112

113
    def test_istioctl(self):
5✔
114
        context = self.context
5✔
115
        version_out: str = context.app.run_capturing_out(context.istio.stanza() + ["version", "-o", "json"],
5✔
116
                                                         stderr_logger)
117

118
        version_out_js = json.loads(version_out)
5✔
119
        version = version_out_js["clientVersion"]["version"]
5✔
120
        logger.info("Using istioctl %r version %r with stanza %r",
5✔
121
                    self.context.istio.istioctl_file, version, context.istio.stanza())
122

123
        logger.info("Found Istio client version %s", version)
5✔
124

125
        return version, version_out_js
5✔
126

127
    def set_context(self, context):
5✔
128
        self.context = context
5✔
129

130
    def stanza(self):
5✔
131
        context = self.context.istio
5✔
132
        return [context.istioctl_file, f"--kubeconfig={self.context.kubeconfig.kubeconfig}"]
5✔
133

134
    def handle_init(self):
5✔
135
        pass
5✔
136

137
    def handle_start(self):
5✔
138
        context = self.context
5✔
139

140
        version, version_out_js = self.test_istioctl()
5✔
141
        self.client_version = tuple(map(int, version.split(".")))
5✔
142
        mesh_versions = set(tuple(map(int, m.value.split("."))) for m in MESH_PILOT_JP.find(version_out_js))
5✔
143

144
        if mesh_versions:
5✔
145
            self.server_version = max(mesh_versions)
5✔
146

147
        if not self.server_version:
5✔
148
            logger.info("No Istio mesh has been found and it'll be created")
5✔
149
            self.install = True
5✔
150
            self.provision_operator = True
5✔
151
        elif self.server_version != self.client_version:
5!
152
            logger.info("Istio client is version %s while server is up to %s - up/downgrade will be performed",
5✔
153
                        ".".join(map(str, self.client_version)),
154
                        ".".join(map(str, self.server_version)))
155
            self.upgrade = True
5✔
156
            self.provision_operator = True
5✔
157

158
        if self.client_version >= (1, 24, 0):
5✔
159
            # No more operator in 1.24.0+
160
            self.provision_operator = False
5✔
161

162
        if self.upgrade and (self.client_version >= (1, 24, 0) > self.server_version):
5✔
163
            self.upgrade_from_operator = True
5✔
164

165
        if self.upgrade and (self.client_version < (1, 24, 0) <= self.server_version):
5✔
166
            raise ValueError(f"Unable to downgrade Istio from {self.server_version} to {self.client_version}")
5✔
167

168
        # Register Istio-related CRDs with K8S
169
        if self.client_version >= (1, 24, 0):
5✔
170
            crd_path = "manifests/charts/base/files/crd-all.gen.yaml"
5✔
171
        else:
172
            crd_path = "manifests/charts/base/crds/crd-all.gen.yaml"
5✔
173
        self.context.k8s.load_remote_crds(
5✔
174
            f"https://raw.githubusercontent.com/istio/istio/{'.'.join(map(str, self.client_version))}/{crd_path}",
175
            "yaml")
176

177
        # This plugin only deals with Istio Operator, so only load that stuff
178
        self.resource_definitions_schema = load_remote_file(logger,
5✔
179
                                                            f"https://raw.githubusercontent.com/kubernetes/kubernetes/"
180
                                                            f"{self.context.k8s.server_git_version}"
181
                                                            f"/api/openapi-spec/swagger.json",
182
                                                            FileType.JSON)
183
        self._populate_resource_definitions()
5✔
184

185
        crd_operator_version = (1, 23, 4) if self.client_version >= (1, 24, 0) else self.client_version
5✔
186
        self.add_remote_crds(
5✔
187
            f"https://raw.githubusercontent.com/istio/istio/{'.'.join(map(str, crd_operator_version))}/"
188
            f"manifests/charts/istio-operator/crds/crd-operator.yaml", FileType.YAML)
189

190
        # Exclude Istio YAMLs from K8S resource loading
191
        context.k8s.default_excludes.add("*.istio.yaml")
5✔
192
        context.k8s.default_excludes.add("*.istio.yml")
5✔
193

194
    def handle_before_dir(self, cwd: Path):
5✔
195
        context = self.context
5✔
196

197
        context.istio.default_includes = Globs(context.istio.default_includes)
5✔
198
        context.istio.default_excludes = Globs(context.istio.default_excludes)
5✔
199
        context.istio.includes = Globs(context.istio.default_includes)
5✔
200
        context.istio.excludes = Globs(context.istio.default_excludes)
5✔
201

202
        # Exclude Istio YAMLs from K8S resource loading
203
        context.k8s.excludes.add("*.istio.yaml")
5✔
204
        context.k8s.excludes.add("*.istio.yml")
5✔
205

206
    def handle_after_dir(self, cwd: Path):
5✔
207
        context = self.context
5✔
208
        istio = context.istio
5✔
209

210
        for f in scan_dir(logger, cwd, lambda d: d.is_file(), istio.excludes, istio.includes):
5✔
211
            p = cwd / f.name
5✔
212
            display_p = context.app.display_path(p)
5✔
213
            logger.info("Adding Istio Operator from %s", display_p)
5✔
214

215
            manifests = load_file(logger, p, FileType.YAML, display_p,
5✔
216
                                  self.template_engine,
217
                                  {"ktor": context})
218

219
            self.add_resources(manifests, display_p)
5✔
220

221
    def handle_apply(self):
5✔
222
        context = self.context
5✔
223

224
        if not self.resources:
5✔
225
            logger.info("Skipping Istio as no Operator was processed")
5✔
226
        else:
227
            with tempfile.NamedTemporaryFile(mode="wt", delete=False) as operators_file:
5✔
228
                logger.info("Saving Istio Operators to %s", operators_file.name)
5✔
229
                yaml.safe_dump_all((r.manifest for r in self.resources.values()), operators_file)
5✔
230

231
            if context.app.args.command == "apply":
5!
232
                logger.info("Running Istio precheck")
5✔
233
                context.app.run(context.istio.stanza() + ["x", "precheck"],
5✔
234
                                stdout_logger, stderr_logger).wait()
235
                context.app.run(context.istio.stanza() + ["validate", "-f", operators_file.name],
5✔
236
                                stdout_logger, stderr_logger).wait()
237

238
                dry_run = context.app.args.dry_run
5✔
239

240
                if self.provision_operator:
5✔
241
                    self._create_istio_system_ns(True)
5✔
242
                    self._operator_init(operators_file, True)
5✔
243

244
                    if not dry_run:
5!
245
                        self._create_istio_system_ns(False)
5✔
246
                        self._operator_init(operators_file, False)
5✔
247
                elif self.install:
5✔
248
                    self._install(operators_file, True)
5✔
249

250
                    if not dry_run:
5!
251
                        self._install(operators_file, False)
5✔
252
                elif self.upgrade:
5!
253
                    def _upgrade(dry_run):
5✔
254
                        if self.upgrade_from_operator:
5✔
255
                            # delete deployment -n istio-system istio-operator
256
                            self._delete_resource_internal({"apiVersion": "apps/v1",
5✔
257
                                                            "kind": "Deployment",
258
                                                            "metadata":
259
                                                                {
260
                                                                    "namespace": "istio-system",
261
                                                                    "name": "istio-operator"
262
                                                                }
263
                                                            }, dry_run, True)
264
                        self._upgrade(operators_file, dry_run)
5✔
265

266
                    _upgrade(True)
5✔
267
                    if not dry_run:
5!
268
                        _upgrade(False)
5✔
269

270
    def _delete_resource_internal(self, manifest, dry_run=True, missing_ok=False):
5✔
271
        from kubernetes import client
5✔
272
        from kubernetes.client.rest import ApiException
5✔
273

274
        context = self.context
5✔
275
        k8s_client = context.k8s.client
5✔
276

277
        res = self._create_resource(manifest)
5✔
278
        res.rdef.populate_api(client, k8s_client)
5✔
279
        try:
5✔
280
            res.delete(dry_run=dry_run)
5✔
281
        except ApiException as e:
5✔
282
            if e.status == 404 and missing_ok:
5!
283
                return res
5✔
NEW
284
            api_exc_format_body(e)
×
NEW
285
            raise
×
286

UNCOV
287
        return res
×
288

289
    def _create_resource_internal(self, manifest, dry_run=True, exists_ok=False):
5✔
290
        from kubernetes import client
5✔
291
        from kubernetes.client.rest import ApiException
5✔
292

293
        context = self.context
5✔
294
        k8s_client = context.k8s.client
5✔
295

296
        res = self._create_resource(manifest)
5✔
297
        res.rdef.populate_api(client, k8s_client)
5✔
298
        try:
5✔
299
            res.create(dry_run=dry_run)
5✔
300
        except ApiException as e:
5✔
301
            if e.status == 409 and e.body.get("reason") == "AlreadyExists" and exists_ok:
5!
302
                return res
5✔
NEW
303
            api_exc_format_body(e)
×
NEW
304
            raise
×
305

306
        return res
5✔
307

308
    def _install(self, operators_file, dry_run):
5✔
309
        context = self.context
5✔
310
        status_details = " (dry run)" if dry_run else ""
5✔
311

312
        logger.info("Running Istio install%s", status_details)
5✔
313
        istio_install_cmd = context.istio.stanza() + ["install", "-f", operators_file.name, "-y", "--verify"]
5✔
314
        context.app.run(istio_install_cmd + (["--dry-run"] if dry_run else []),
5✔
315
                        stdout_logger,
316
                        stderr_logger).wait()
317

318
    def _upgrade(self, operators_file, dry_run):
5✔
319
        context = self.context
5✔
320
        status_details = " (dry run)" if dry_run else ""
5✔
321

322
        logger.info("Running Istio upgrade%s", status_details)
5✔
323
        istio_upgrade_cmd = context.istio.stanza() + ["upgrade", "-f", operators_file.name, "-y", "--verify"]
5✔
324
        context.app.run(istio_upgrade_cmd + (["--dry-run"] if dry_run else []),
5✔
325
                        stdout_logger,
326
                        stderr_logger).wait()
327

328
    def _create_istio_system_ns(self, dry_run):
5✔
329
        status_details = " (dry run)" if dry_run else ""
5✔
330
        logger.info("Creating istio-system namespace%s", status_details)
5✔
331
        self._create_resource_internal({"apiVersion": "v1",
5✔
332
                                        "kind": "Namespace",
333
                                        "metadata": {
334
                                            "labels": {
335
                                                "istio-injection": "disabled"
336
                                            },
337
                                            "name": "istio-system"
338
                                        }
339
                                        },
340
                                       dry_run=dry_run,
341
                                       exists_ok=True
342
                                       )
343

344
    def _operator_init(self, operators_file, dry_run):
5✔
345
        context = self.context
5✔
346
        status_details = " (dry run)" if dry_run else ""
5✔
347

348
        logger.info("Running Istio operator init%s", status_details)
5✔
349
        istio_operator_init = context.istio.stanza() + ["operator", "init", "-f", operators_file.name]
5✔
350
        context.app.run(istio_operator_init + (["--dry-run"] if dry_run else []),
5✔
351
                        stdout_logger,
352
                        stderr_logger).wait()
353

354
    def __repr__(self):
5✔
355
        return "Istio Plugin"
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc