• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24621395421

19 Apr 2026 04:58AM UTC coverage: 81.992% (+4.1%) from 77.939%
24621395421

push

github

web-flow
Add OpenAPI v3 validation with CEL rules; extract validator into k8s_schema (#104)

## Summary

Extracts all OpenAPI validation out of `k8s.py` / `k8s_api.py` into a
new `kubernator/plugins/k8s_schema/` package, and adds an OpenAPI v3
validator with client-side CEL rule evaluation as the default on
clusters ≥ 1.27.

- **`k8s_api.py` no longer does validation** — `K8SResourcePluginMixin`
holds a single `self.validator` and delegates every manifest check
(minimal envelope → rdef lookup → full schema → CEL rules) to it.
Dependency graph is one-way: `k8s_schema → k8s_api`.
- **Two validators behind a factory.** `SwaggerV2Validator` preserves
current behaviour (swagger.json + OAS31). `OpenAPIV3Validator` is new:
cluster-first `/openapi/v3` discovery with GitHub fallback, lazy
per-group fetch, transitive `$ref` closure across group documents.
- **K8s extensions enforced client-side** under v3:
`x-kubernetes-list-type` (`atomic` / `set` / `map` with
`list-map-keys`), `-preserve-unknown-fields`, `-embedded-resource`,
`-int-or-string`.
- **CEL rules evaluated client-side.** Lazy-compiled, cached by rule
text for the run. Ports of the K8s CEL libraries — `lists`, `regex`,
`format.named(...)`, `quantity`, `IP`, `CIDR` — plus a compatibility
shim for `optional.of` / `optional.none` / `hasValue` / `value` /
`orValue` so `optionalSelf` / `optionalOldSelf` work. Transition rules
fire in the apply path against the cluster's current state.
- **Selection.** `ktor.k8s.register()` gains `openapi_version` (`auto` /
`v2` / `v3`) and `openapi_source` (`auto` / `cluster` / `github`) kwargs
(also readable from context). `auto` gates on server minor ≥ 1.27 and
falls back to v2 on any v3 failure. `istio.py` is migrated to the same
factory.

New dep: `cel-python ~=0.5`.

## Test plan

- [x] 111 unit tests (v2 parity; v3 lazy fetch; api_versions without
sub-doc fetch; source fallback; every extension keyword; CEL walker +
evaluator; 7 extension libraries; factory selection / vers... (continued)

943 of 1328 branches covered (71.01%)

Branch coverage included in aggregate %.

1040 of 1082 new or added lines in 18 files covered. (96.12%)

6 existing lines in 2 files now uncovered.

3979 of 4675 relevant lines covered (85.11%)

4.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.75
/src/main/python/kubernator/plugins/k3d.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2024 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18
import http.client
5✔
19
import json
5✔
20
import logging
5✔
21
import os
5✔
22
import socket
5✔
23
import ssl
5✔
24
import tempfile
5✔
25
import time
5✔
26
from pathlib import Path
5✔
27
from urllib.parse import urlparse
5✔
28

29
import yaml
5✔
30

31
from kubernator.api import (KubernatorPlugin,
5✔
32
                            StripNL,
33
                            get_golang_os,
34
                            get_golang_machine,
35
                            prepend_os_path,
36
                            get_cache_dir,
37
                            CalledProcessError,
38
                            )
39

40
logger = logging.getLogger("kubernator.k3d")
5✔
41
proc_logger = logger.getChild("proc")
5✔
42
stdout_logger = StripNL(proc_logger.info)
5✔
43
stderr_logger = StripNL(proc_logger.warning)
5✔
44

45
K3D_CLUSTER_LABEL = "k3d.cluster"
5✔
46

47
DEFAULT_NODE_IMAGE_REGISTRY = "rancher/k3s"
5✔
48
DEFAULT_NODE_IMAGE_SUFFIX = "-k3s1"
5✔
49

50

51
class K3dPlugin(KubernatorPlugin):
5✔
52
    logger = logger
5✔
53

54
    _name = "k3d"
5✔
55

56
    def __init__(self):
5✔
57
        self.context = None
5✔
58
        self.k3d_dir = None
5✔
59
        self.kubeconfig_dir = None
5✔
60
        self._config_path = None
5✔
61
        super().__init__()
5✔
62

63
    def set_context(self, context):
5✔
64
        self.context = context
5✔
65

66
    def _resolve_latest_tag(self, repo_url):
5✔
67
        """Return highest v<major>.<minor>.<patch> tag from a GitHub repo via git ls-remote.
68
        Skips pre-release tags like v0.1.0-alpha, v0.2.0-rc.1."""
69
        versions = self.context.app.run_capturing_out(
5✔
70
            ["git", "ls-remote", "-t", "--refs", repo_url, "v*"],
71
            stderr_logger,
72
        )
73
        tuples = []
5✔
74
        for line in versions.splitlines():
5✔
75
            if not line:
5!
76
                continue
×
77
            # "<sha>\trefs/tags/v1.2.3" -> "1.2.3"
78
            tag = line.split()[1][11:]
5✔
79
            parts = tag.split(".")
5✔
80
            if len(parts) != 3:
5✔
81
                continue
5✔
82
            if not all(p.isdigit() for p in parts):
5✔
UNCOV
83
                continue
1✔
84
            tuples.append(tuple(int(p) for p in parts))
5✔
85
        if not tuples:
5!
86
            raise RuntimeError(f"No numeric v<major>.<minor>.<patch> tags found at {repo_url}")
×
87
        return ".".join(str(x) for x in sorted(tuples)[-1])
5✔
88

89
    def get_latest_k3d_version(self):
5✔
90
        return self._resolve_latest_tag("https://github.com/k3d-io/k3d")
5✔
91

92
    def cmd(self, *extra_args):
5✔
93
        stanza, env = self._stanza(list(extra_args))
5✔
94
        return self.context.app.run(stanza, stdout_logger, stderr_logger, env=env).wait()
5✔
95

96
    def cmd_out(self, *extra_args):
5✔
97
        stanza, env = self._stanza(list(extra_args))
5✔
98
        return self.context.app.run_capturing_out(stanza, stderr_logger, env=env)
5✔
99

100
    def _stanza(self, extra_args):
5✔
101
        context = self.context
5✔
102
        k3d = context.k3d
5✔
103
        stanza = [k3d.k3d_file] + extra_args
5✔
104
        env = dict(os.environ)
5✔
105
        env["KUBECONFIG"] = str(k3d.kubeconfig)
5✔
106
        return stanza, env
5✔
107

108
    def _docker_ps(self, *filters, all_containers=True):
5✔
109
        args = ["docker", "ps", "-q"]
×
110
        if all_containers:
×
111
            args.append("-a")
×
112
        for f in filters:
×
113
            args += ["--filter", f]
×
114
        out = self.context.app.run_capturing_out(args, stderr_logger).strip()
×
115
        return [line for line in out.splitlines() if line]
×
116

117
    def _cluster_containers(self, profile, all_containers=True):
5✔
118
        filters = [f"label={K3D_CLUSTER_LABEL}={profile}"]
×
119
        return self._docker_ps(*filters, all_containers=all_containers)
×
120

121
    def _cluster_exists(self, profile):
5✔
122
        out = self.cmd_out("cluster", "list", "-o", "json")
5✔
123
        try:
5✔
124
            clusters = json.loads(out) if out.strip() else []
5✔
125
        except json.JSONDecodeError as e:
×
126
            raise RuntimeError(f"Unable to parse `k3d cluster list -o json` output: {e}") from e
×
127
        return any(c.get("name") == profile for c in clusters)
5✔
128

129
    def _detect_provider(self, provider):
5✔
130
        context = self.context
5✔
131
        cmd_debug_logger = StripNL(proc_logger.debug)
5✔
132

133
        def probe(binary):
5✔
134
            try:
5✔
135
                context.app.run([binary, "info"], cmd_debug_logger, cmd_debug_logger).wait()
5✔
136
                return True
5✔
137
            except (FileNotFoundError, CalledProcessError) as e:
×
138
                logger.trace("%s is NOT functional", binary, exc_info=e)
×
139
                return False
×
140

141
        if provider and provider != "docker":
5✔
142
            raise RuntimeError(
5✔
143
                f"k3d only supports the 'docker' provider; got {provider!r}")
144

145
        if probe("docker"):
5!
146
            logger.info("Docker is functional, selecting 'docker' as the k3d provider")
5✔
147
            return "docker"
5✔
148
        raise RuntimeError("Docker is not functional; k3d requires Docker.")
×
149

150
    def register(self,
5✔
151
                 k3d_version=None,
152
                 profile="default",
153
                 k8s_version=None,
154
                 node_image=None,
155
                 node_image_registry=DEFAULT_NODE_IMAGE_REGISTRY,
156
                 node_image_suffix=DEFAULT_NODE_IMAGE_SUFFIX,
157
                 keep_running=False,
158
                 start_fresh=False,
159
                 nodes=1,
160
                 control_plane_nodes=1,
161
                 provider=None,
162
                 config=None,
163
                 extra_port_mappings=None,
164
                 feature_gates=None,
165
                 runtime_config=None,
166
                 k3s_server_args=None,
167
                 k3s_agent_args=None):
168
        context = self.context
5✔
169

170
        context.app.register_plugin("kubeconfig")
5✔
171

172
        if not k8s_version and not node_image:
5✔
173
            msg = "Either k8s_version or node_image must be specified for k3d"
5✔
174
            logger.critical(msg)
5✔
175
            raise RuntimeError(msg)
5✔
176

177
        if nodes < 1:
5✔
178
            raise RuntimeError(f"k3d requires nodes >= 1, got {nodes}")
5✔
179
        if control_plane_nodes < 1:
5✔
180
            raise RuntimeError(f"k3d requires control_plane_nodes >= 1, got {control_plane_nodes}")
5✔
181
        if control_plane_nodes > nodes:
5✔
182
            raise RuntimeError(
5✔
183
                f"control_plane_nodes ({control_plane_nodes}) cannot exceed nodes ({nodes})")
184

185
        k8s_version_tuple = tuple(map(int, k8s_version.split("."))) if k8s_version else None
5✔
186

187
        if not k3d_version:
5!
188
            k3d_version = self.get_latest_k3d_version()
5✔
189
            logger.info("No k3d version is specified, latest is %s", k3d_version)
5✔
190

191
        k3d_dl_file, _ = context.app.download_remote_file(
5✔
192
            logger,
193
            f"https://github.com/k3d-io/k3d/releases/download/v{k3d_version}/"
194
            f"k3d-{get_golang_os()}-{get_golang_machine()}",
195
            "bin",
196
        )
197
        os.chmod(k3d_dl_file, 0o500)
5✔
198

199
        self.k3d_dir = tempfile.TemporaryDirectory()
5✔
200
        context.app.register_cleanup(self.k3d_dir)
5✔
201
        k3d_file = Path(self.k3d_dir.name) / "k3d"
5✔
202
        k3d_file.symlink_to(k3d_dl_file)
5✔
203
        prepend_os_path(self.k3d_dir.name)
5✔
204

205
        version_out: str = context.app.run_capturing_out(
5✔
206
            [str(k3d_file), "version"], stderr_logger).strip()
207
        # "k3d version v5.7.4\nk3s version v1.30.4-k3s1 (default)"
208
        version = k3d_version
5✔
209
        for line in version_out.splitlines():
5!
210
            line = line.strip()
5✔
211
            if line.startswith("k3d version "):
5!
212
                version = line[len("k3d version "):].lstrip("v")
5✔
213
                break
5✔
214
        logger.info("Found k3d %s in %s", version, k3d_file)
5✔
215

216
        profile_dir = get_cache_dir("k3d")
5✔
217
        self.kubeconfig_dir = profile_dir / ".kube" / profile
5✔
218
        self.kubeconfig_dir.mkdir(parents=True, exist_ok=True)
5✔
219
        kubeconfig_path = self.kubeconfig_dir / "config"
5✔
220

221
        resolved_provider = self._detect_provider(provider)
5✔
222

223
        if not node_image and k8s_version:
5!
224
            node_image = f"{node_image_registry}:v{k8s_version}{node_image_suffix}"
5✔
225

226
        context.globals.k3d = dict(
5✔
227
            version=version,
228
            k3d_file=str(k3d_file),
229
            profile=profile,
230
            k8s_version=k8s_version,
231
            k8s_version_tuple=k8s_version_tuple,
232
            node_image=node_image,
233
            node_image_registry=node_image_registry,
234
            node_image_suffix=node_image_suffix,
235
            start_fresh=start_fresh,
236
            keep_running=keep_running,
237
            nodes=nodes,
238
            control_plane_nodes=control_plane_nodes,
239
            provider=resolved_provider,
240
            config=config,
241
            extra_port_mappings=list(extra_port_mappings) if extra_port_mappings else [],
242
            feature_gates=dict(feature_gates) if feature_gates else {},
243
            runtime_config=dict(runtime_config) if runtime_config else {},
244
            k3s_server_args=list(k3s_server_args) if k3s_server_args else [],
245
            k3s_agent_args=list(k3s_agent_args) if k3s_agent_args else [],
246
            kubeconfig=str(kubeconfig_path),
247
            cmd=self.cmd,
248
            cmd_out=self.cmd_out,
249
        )
250
        context.kubeconfig.kubeconfig = context.k3d.kubeconfig
5✔
251

252
        logger.info("k3d kubeconfig is %s", context.k3d.kubeconfig)
5✔
253
        logger.info("k3d node image is %s", node_image)
5✔
254

255
    def _generate_cluster_config(self):
5✔
256
        k3d = self.context.k3d
5✔
257
        if k3d.config:
5✔
258
            return k3d.config
5✔
259

260
        agents = k3d.nodes - k3d.control_plane_nodes
5✔
261
        needs_config = (k3d.control_plane_nodes > 1
5✔
262
                        or agents > 0
263
                        or k3d.extra_port_mappings
264
                        or k3d.feature_gates
265
                        or k3d.runtime_config
266
                        or k3d.k3s_server_args
267
                        or k3d.k3s_agent_args)
268
        if not needs_config:
5✔
269
            return None
5✔
270

271
        doc = {
5✔
272
            "apiVersion": "k3d.io/v1alpha5",
273
            "kind": "Simple",
274
            "servers": k3d.control_plane_nodes,
275
            "agents": agents,
276
        }
277
        if k3d.node_image:
5!
278
            doc["image"] = k3d.node_image
5✔
279

280
        if k3d.extra_port_mappings:
5✔
281
            ports = []
5✔
282
            for m in k3d.extra_port_mappings:
5✔
283
                host_port = m["hostPort"]
5✔
284
                container_port = m["containerPort"]
5✔
285
                protocol = m.get("protocol")
5✔
286
                spec = f"{host_port}:{container_port}"
5✔
287
                if protocol:
5✔
288
                    spec = f"{spec}/{protocol}"
5✔
289
                ports.append({"port": spec, "nodeFilters": ["loadbalancer"]})
5✔
290
            doc["ports"] = ports
5✔
291

292
        extra_args = []
5✔
293
        if k3d.feature_gates:
5✔
294
            gates = ",".join(f"{k}={'true' if v else 'false'}"
5✔
295
                             for k, v in k3d.feature_gates.items())
296
            extra_args.append({
5✔
297
                "arg": f"--kube-apiserver-arg=feature-gates={gates}",
298
                "nodeFilters": ["server:*"],
299
            })
300
        if k3d.runtime_config:
5✔
301
            cfg = ",".join(f"{k}={v}" for k, v in k3d.runtime_config.items())
5✔
302
            extra_args.append({
5✔
303
                "arg": f"--kube-apiserver-arg=runtime-config={cfg}",
304
                "nodeFilters": ["server:*"],
305
            })
306
        for arg in k3d.k3s_server_args:
5✔
307
            extra_args.append({"arg": arg, "nodeFilters": ["server:*"]})
5✔
308
        for arg in k3d.k3s_agent_args:
5✔
309
            extra_args.append({"arg": arg, "nodeFilters": ["agent:*"]})
5✔
310
        if extra_args:
5✔
311
            doc["options"] = {"k3s": {"extraArgs": extra_args}}
5✔
312

313
        return yaml.safe_dump(doc, sort_keys=False)
5✔
314

315
    def _write_cluster_config(self):
5✔
316
        config_yaml = self._generate_cluster_config()
5✔
317
        if not config_yaml:
5✔
318
            self._config_path = None
5✔
319
            return None
5✔
320
        self._config_path = Path(self.k3d_dir.name) / "cluster.yaml"
5✔
321
        self._config_path.write_text(config_yaml)
5✔
322
        logger.debug("Wrote k3d cluster config to %s:\n%s", self._config_path, config_yaml)
5✔
323
        return self._config_path
5✔
324

325
    def _export_kubeconfig(self):
5✔
326
        k3d = self.context.k3d
5✔
327
        config_yaml = self.cmd_out("kubeconfig", "get", k3d.profile)
5✔
328
        Path(k3d.kubeconfig).write_text(config_yaml)
5✔
329
        logger.info("Wrote kubeconfig for cluster %r to %s", k3d.profile, k3d.kubeconfig)
5✔
330

331
    def k3d_create(self):
5✔
332
        k3d = self.context.k3d
5✔
333
        args = ["cluster", "create", k3d.profile, "--wait", "--timeout", "120s"]
5✔
334
        config_path = self._write_cluster_config()
5✔
335
        if config_path:
5✔
336
            args += ["--config", str(config_path)]
5✔
337
            # When a config file is supplied, image/servers/agents are inside it;
338
            # passing them again on the CLI is rejected by k3d as a conflict.
339
        else:
340
            if k3d.node_image:
5!
341
                args += ["--image", k3d.node_image]
5✔
342
        logger.info("Creating k3d cluster %r (image=%s, nodes=%d, control_plane_nodes=%d)",
5✔
343
                    k3d.profile, k3d.node_image, k3d.nodes, k3d.control_plane_nodes)
344
        self.cmd(*args)
5✔
345

346
    def k3d_delete(self):
5✔
347
        k3d = self.context.k3d
5✔
348
        logger.warning("Deleting k3d cluster %r", k3d.profile)
5✔
349
        try:
5✔
350
            self.cmd("cluster", "delete", k3d.profile)
5✔
351
        except CalledProcessError as e:
×
352
            logger.warning("k3d delete failed for %r: %s", k3d.profile, e)
×
353

354
    def k3d_stop(self):
5✔
355
        k3d = self.context.k3d
5✔
356
        if not self._cluster_exists(k3d.profile):
5!
357
            logger.info("k3d cluster %r does not exist; nothing to stop", k3d.profile)
×
358
            return
×
359
        logger.info("Stopping k3d cluster %r", k3d.profile)
5✔
360
        try:
5✔
361
            self.cmd("cluster", "stop", k3d.profile)
5✔
362
        except CalledProcessError as e:
×
363
            logger.warning("k3d stop failed for %r: %s", k3d.profile, e)
×
364

365
    def k3d_start(self):
5✔
366
        k3d = self.context.k3d
5✔
367
        resumed = False
5✔
368
        if self._cluster_exists(k3d.profile):
5!
369
            logger.info("Starting existing k3d cluster %r", k3d.profile)
×
370
            self.cmd("cluster", "start", k3d.profile, "--wait", "--timeout", "120s")
×
371
            resumed = True
×
372
        else:
373
            self.k3d_create()
5✔
374

375
        self._export_kubeconfig()
5✔
376

377
        # On resume, k3d's --wait may return before the apiserver is fully
378
        # answering /readyz over TLS. Poll until it does so downstream
379
        # plugins don't see SSL handshake failures.
380
        if resumed:
5!
381
            self._wait_for_apiserver()
×
382

383
        context = self.context
5✔
384
        context.app.register_plugin("kubectl", version=k3d.k8s_version)
5✔
385

386
    def _wait_for_apiserver(self, timeout=120):
5✔
387
        with open(self.context.k3d.kubeconfig) as f:
×
388
            cfg = yaml.safe_load(f)
×
389
        server_url = cfg["clusters"][0]["cluster"]["server"]
×
390
        parsed = urlparse(server_url)
×
391
        ssl_ctx = ssl._create_unverified_context()
×
392

393
        logger.info("Waiting up to %ds for apiserver at %s to be ready",
×
394
                    timeout, server_url)
395
        deadline = time.monotonic() + timeout
×
396
        last_err = None
×
397
        while time.monotonic() < deadline:
×
398
            try:
×
399
                conn = http.client.HTTPSConnection(parsed.hostname, parsed.port,
×
400
                                                   context=ssl_ctx, timeout=5)
401
                try:
×
402
                    conn.request("GET", "/readyz")
×
403
                    resp = conn.getresponse()
×
404
                    status = resp.status
×
405
                    resp.read()
×
406
                finally:
407
                    conn.close()
×
408
                if status == 200:
×
409
                    logger.info("Apiserver at %s is ready", server_url)
×
410
                    return
×
411
                last_err = f"HTTP {status}"
×
412
            except (OSError, socket.error, ssl.SSLError,
×
413
                    http.client.HTTPException) as e:
414
                last_err = f"{type(e).__name__}: {e}"
×
415
            time.sleep(2)
×
416
        raise RuntimeError(
×
417
            f"Apiserver at {server_url} did not become ready within {timeout}s: {last_err}")
418

419
    def handle_start(self):
5✔
420
        k3d = self.context.k3d
5✔
421
        if k3d.start_fresh:
5!
422
            if self._cluster_exists(k3d.profile):
5!
423
                self.k3d_delete()
5✔
424
        self.k3d_start()
5✔
425

426
    def handle_shutdown(self):
5✔
427
        k3d = self.context.k3d
5✔
428
        if k3d.keep_running:
5!
429
            logger.warning("Keeping k3d cluster %r running", k3d.profile)
×
430
            return
×
431
        self.k3d_stop()
5✔
432

433
    def __repr__(self):
5✔
434
        return "k3d Plugin"
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc