• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24580658204

17 Apr 2026 06:29PM UTC coverage: 77.939% (+1.0%) from 76.951%
24580658204

push

github

web-flow
Refactor k8s apply: SSA test-ops, 409 retry, watch-based delete, imperative API (#102)

## Summary

- Harden the K8s apply path with SSA-derived JSON-patch `test` ops for
`/metadata/uid` and `/metadata/resourceVersion`, and wrap the SSA patch
branch in a 409-retry loop that recomputes the patch on conflict.
- Extend `K8SResource` with `list()`, `watch()`, `delete(wait=True)`
that uses watch + `get()` recheck, and add a `resource_generator()`
extension seam on `KubernetesPlugin`.
- Add `ktor.k8s.resource(manifest)` — a fully-wired `K8SResource`
factory for imperative CRUD from `.kubernator.py` scripts, bypassing the
declarative apply lifecycle.
- Thread the applied manifest as a 4th return from `_apply_resource` and
its inner patch/create/delete callables for external consumers.
- Move dump-file open/close from `app.py` into
`KubernetesPlugin.handle_apply`; fix dump-mode `patch_func` to seed
uid/RV on a copy of the local manifest by extracting them from the
patch's own test ops (via `jsonpointer`), so the simulated post-patch
manifest can be returned honestly.
- Rewrite `delete_create` and `resource_version_merge` integration tests
as single-phase using the new imperative API, replacing `kubectl`-based
phase-1 setup and the `TEST_PHASE` env re-invocation.
- Add unit test for 409 retry and integration tests for `watch()` and
`resource_generator`. Port `resource_version_merge` from minikube to
kind.

## Test plan

- [x] `pyb -vX run_unit_tests` (32 tests, all green — includes new
`k8s_apply_retry_tests`)
- [x] `pyb -vX run_integration_tests -P
integrationtest_file_glob=delete_create_tests.py` (single-phase, kind,
immutable-field delete+recreate path)
- [x] `pyb -vX run_integration_tests -P
integrationtest_file_glob=resource_version_merge_tests.py`
(single-phase, kind, dump mode patch verification)
- [ ] Full `pyb -vX run_integration_tests` suite via CI across the
Python 3.10–3.14 matrix

614 of 976 branches covered (62.91%)

Branch coverage included in aggregate %.

137 of 163 new or added lines in 4 files covered. (84.05%)

5 existing lines in 3 files now uncovered.

3039 of 3711 relevant lines covered (81.89%)

4.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.98
/src/main/python/kubernator/app.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import argparse
5✔
20
import datetime
5✔
21
import importlib
5✔
22
import logging
5✔
23
import pkgutil
5✔
24
import sys
5✔
25
import urllib.parse
5✔
26
from collections import deque
5✔
27
from collections.abc import MutableMapping, Callable
5✔
28
from contextlib import closing
5✔
29
from pathlib import Path
5✔
30
from shutil import rmtree
5✔
31
from typing import Optional, Union
5✔
32

33
import yaml
5✔
34

35
import kubernator
5✔
36
from kubernator.api import (KubernatorPlugin, Globs, scan_dir, PropertyDict, config_as_dict, config_parent,
5✔
37
                            download_remote_file, load_remote_file, Repository, StripNL, jp, get_app_cache_dir,
38
                            get_cache_dir, install_python_k8s_client)
39
from kubernator.proc import run, run_capturing_out, run_pass_through_capturing
5✔
40

41
TRACE = 5
5✔
42

43

44
def trace(self, msg, *args, **kwargs):
5✔
45
    """
46
    Log 'msg % args' with severity 'TRACE'.
47

48
    To pass exception information, use the keyword argument exc_info with
49
    a true value, e.g.
50

51
    logger.trace("Houston, we have a %s", "interesting problem", exc_info=1)
52
    """
53
    if self.isEnabledFor(TRACE):
5✔
54
        self._log(TRACE, msg, args, **kwargs)
5✔
55

56

57
logging.addLevelName(5, "TRACE")
5✔
58
logging.Logger.trace = trace
5✔
59
logger = logging.getLogger("kubernator")
5✔
60

61
try:
5✔
62
    del (yaml.resolver.Resolver.yaml_implicit_resolvers["="])
5✔
63
except KeyError:
5✔
64
    pass
5✔
65

66

67
def define_arg_parse():
5✔
68
    parser = argparse.ArgumentParser(prog="kubernator",
5✔
69
                                     description="Kubernetes Provisioning Tool",
70
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
71
    g = parser.add_mutually_exclusive_group()
5✔
72
    g.add_argument("--version", action="version", version=kubernator.__version__,
5✔
73
                   help="print version and exit")
74
    g.add_argument("--clear-cache", action="store_true",
5✔
75
                   help="clear cache and exit")
76
    g.add_argument("--clear-k8s-cache", action="store_true",
5✔
77
                   help="clear Kubernetes Client cache and exit")
78
    g.add_argument("--pre-cache-k8s-client", action="extend", nargs="+", type=int,
5✔
79
                   help="download specified K8S client library major(!) version(s) and exit")
80
    parser.add_argument("--pre-cache-k8s-client-no-patch", action="store_true", default=None,
5✔
81
                        help="do not patch the k8s client being pre-cached")
82
    parser.add_argument("--log-format", choices=["human", "json"], default="human",
5✔
83
                        help="whether to log for human or machine consumption")
84
    parser.add_argument("--log-file", type=str, default=None,
5✔
85
                        help="where to log, defaults to `stderr`")
86
    parser.add_argument("-v", "--verbose", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"],
5✔
87
                        default="INFO", help="how verbose do you want Kubernator to be")
88
    parser.add_argument("-f", "--file", type=str, default=None,
5✔
89
                        help="where to generate results, if necessary, defaults to `stdout`")
90
    parser.add_argument("-o", "--output-format", choices=["json", "json-pretty", "yaml"], default="yaml",
5✔
91
                        help="in what format to generate results")
92
    parser.add_argument("-p", "--path", dest="path", default=".",
5✔
93
                        type=Path, help="path to start processing")
94
    #    parser.add_argument("--pre-start-script", default=None, type=Path,
95
    #                        help="location of the pre-start script")
96
    #    parser.add_argument("--disconnected", action="store_true", default=False,
97
    #                        help="do not actually connect to the target Kubernetes")
98
    #    parser.add_argument("--k8s-version", type=str, default=None,
99
    #                        help="specify a version of Kubernetes when operating in the disconnected mode")
100
    parser.add_argument("--yes", action="store_false", default=True, dest="dry_run",
5✔
101
                        help="actually make destructive changes")
102
    parser.add_argument("command", nargs="?", choices=["dump", "apply"], default="dump",
5✔
103
                        help="whether to dump the proposed changes to the output or to apply them")
104
    return parser
5✔
105

106

107
def init_logging(verbose, output_format, output_file):
5✔
108
    root_log = logging.root
5✔
109

110
    handler = logging.StreamHandler(output_file)
5✔
111
    root_log.addHandler(handler)
5✔
112

113
    if output_format == "human":
5✔
114
        if handler.stream.isatty():
5!
115
            import coloredlogs
×
116
            fmt_cls = coloredlogs.ColoredFormatter
×
117

118
        else:
119
            fmt_cls = logging.Formatter
5✔
120

121
        def formatTime(record, datefmt=None):
5✔
122
            return datetime.datetime.fromtimestamp(record.created).isoformat()
5✔
123

124
        formatter = fmt_cls("%(asctime)s %(name)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
5✔
125
        formatter.formatTime = formatTime
5✔
126
    else:
127
        import json_log_formatter
5✔
128

129
        class JSONFormatter(json_log_formatter.JSONFormatter):
5✔
130
            def json_record(self, message, extra, record: logging.LogRecord):
5✔
131
                extra = super(JSONFormatter, self).json_record(message, extra, record)
5✔
132
                extra["ts"] = datetime.datetime.fromtimestamp(record.created)
5✔
133
                extra["name"] = record.name
5✔
134
                extra["level"] = record.levelname
5✔
135
                extra["fn"] = record.filename
5✔
136
                extra["ln"] = record.lineno
5✔
137
                del extra["time"]
5✔
138
                return extra
5✔
139

140
        formatter = JSONFormatter()
5✔
141

142
    handler.setFormatter(formatter)
5✔
143
    logger.setLevel(logging._nameToLevel[verbose])
5✔
144

145

146
class App(KubernatorPlugin):
5✔
147
    _name = "app"
5✔
148

149
    def __init__(self, args):
5✔
150
        self.args = args
5✔
151
        path = args.path.absolute()
5✔
152

153
        global_context = PropertyDict()
5✔
154
        global_context.globals = global_context
5✔
155
        context = PropertyDict(_parent=global_context)
5✔
156
        context._plugins = []
5✔
157

158
        self._top_level_context = context
5✔
159
        self.context = context
5✔
160
        self._top_dir_context = PropertyDict(_parent=self.context)
5✔
161

162
        self.repos: MutableMapping[Repository, Repository] = dict()
5✔
163
        self.path_q: deque[tuple[PropertyDict, Path]] = deque(((self._top_dir_context, path),))
5✔
164

165
        self._new_paths: list[tuple[PropertyDict, Path]] = []
5✔
166

167
        self._cleanups = []
5✔
168
        self._plugin_types = {}
5✔
169

170
    def __enter__(self):
5✔
171
        return self
5✔
172

173
    def __exit__(self, exc_type, exc_val, exc_tb):
5✔
174
        self.cleanup()
5✔
175

176
    def run(self):
5✔
177
        logger.info("Starting Kubernator version %s", kubernator.__version__)
5✔
178

179
        self.register_plugin(self)
5✔
180

181
        try:
5✔
182
            try:
5✔
183
                while True:
5✔
184
                    cwd = self.next()
5✔
185
                    if not cwd:
5✔
186
                        logger.debug("No paths left to traverse")
5✔
187
                        break
5✔
188

189
                    context = self.context
5✔
190

191
                    logger.debug("Inspecting directory %s", self._display_path(cwd))
5✔
192
                    self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, None, cwd)
5✔
193

194
                    if (ktor_py := (cwd / ".kubernator.py")).exists():
5✔
195
                        self._run_handlers(KubernatorPlugin.handle_before_script, False, context, None, cwd)
5✔
196

197
                        for h in self.context._plugins:
5✔
198
                            h.set_context(context)
5✔
199

200
                        self._exec_ktor(ktor_py)
5✔
201

202
                        for h in self.context._plugins:
5✔
203
                            h.set_context(None)
5✔
204

205
                        self._run_handlers(KubernatorPlugin.handle_after_script, True, context, None, cwd)
5✔
206

207
                    self._run_handlers(KubernatorPlugin.handle_after_dir, True, context, None, cwd)
5✔
208

209
                self.context = self._top_dir_context
5✔
210
                context = self.context
5✔
211

212
                self._run_handlers(KubernatorPlugin.handle_apply, True, context, None)
5✔
213

214
                self._run_handlers(KubernatorPlugin.handle_verify, True, context, None)
5✔
215
            finally:
216
                self.context = self._top_dir_context
5✔
217
                context = self.context
5✔
218
                self._run_handlers(KubernatorPlugin.handle_shutdown, True, context, None)
5✔
219
        except:  # noqa E722
5✔
220
            raise
5✔
221
        else:
222
            self.context = self._top_dir_context
5✔
223
            context = self.context
5✔
224
            self._run_handlers(KubernatorPlugin.handle_summary, True, context, None)
5✔
225

226
    def discover_plugins(self):
5✔
227
        importlib.invalidate_caches()
5✔
228
        search_path = Path(kubernator.__path__[0], "plugins")
5✔
229
        [importlib.import_module(name)
5✔
230
         for finder, name, is_pkg in
231
         pkgutil.iter_modules([str(search_path)], "kubernator.plugins.")]
232

233
        for plugin in KubernatorPlugin.__subclasses__():
5✔
234
            if plugin._name in self._plugin_types:
5!
235
                logger.warning("Plugin named %r in %r is already reserved by %r and will be ignored",
×
236
                               plugin._name,
237
                               plugin,
238
                               self._plugin_types[plugin._name])
239
            else:
240
                logger.info("Plugin %r discovered in %r", plugin._name, plugin)
5✔
241
                self._plugin_types[plugin._name] = plugin
5✔
242

243
    def assert_plugin(self, plugin: Union[KubernatorPlugin, type[KubernatorPlugin], str],
5✔
244
                      requester: Union[KubernatorPlugin, type[KubernatorPlugin], str]):
245
        context = self.context
5✔
246
        if isinstance(plugin, str):
5!
247
            try:
5✔
248
                plugin_type = self._plugin_types[plugin]
5✔
249
            except KeyError:
×
250
                logger.critical("No known plugin with the name %r", plugin)
×
251
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
252
        elif isinstance(plugin, type):
×
253
            plugin_type = plugin
×
254
        else:
255
            plugin_type = type(plugin)
×
256

257
        for p in context._plugins:
5!
258
            if p._name == plugin_type._name:
5✔
259
                return
5✔
260

261
        raise RuntimeError("Plugin %s requires plugin %s to be initialized",
×
262
                           requester if hasattr(requester, "_name") else requester,
263
                           plugin)
264

265
    def register_plugin(self, plugin: Union[KubernatorPlugin, type, str], **kwargs):
5✔
266
        context = self.context
5✔
267
        if isinstance(plugin, str):
5✔
268
            try:
5✔
269
                plugin_obj = self._plugin_types[plugin]()
5✔
270
            except KeyError:
×
271
                logger.critical("No known plugin with the name %r", plugin)
×
272
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
273
        elif isinstance(plugin, type):
5!
274
            plugin_obj = plugin()
×
275
        else:
276
            plugin_obj = plugin
5✔
277

278
        for p in context._plugins:
5✔
279
            if p._name == plugin_obj._name:
5✔
280
                logger.info("Plugin with name %r already registered, skipping", p._name)
5✔
281
                return
5✔
282

283
        logger.info("Registering plugin %r via %r", plugin_obj._name, plugin_obj)
5✔
284

285
        # Register
286
        self._run_handlers(KubernatorPlugin.register, False, context, plugin_obj, **kwargs)
5✔
287

288
        context._plugins.append(plugin_obj)
5✔
289

290
        # Init
291
        self._run_handlers(KubernatorPlugin.handle_init, False, context, plugin_obj)
5✔
292

293
        # Start
294
        self._run_handlers(KubernatorPlugin.handle_start, False, context, plugin_obj)
5✔
295

296
        # If we're already processing a directory
297
        if "app" in self.context and "cwd" in self.context.app and self.context.app.cwd:
5✔
298
            cwd = self.context.app.cwd
5✔
299
            self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, plugin_obj, cwd)
5✔
300

301
            # If we're already in the script (TODO: is it possible to NOT be in a script?)
302
            if "script" in self.context.app:
5!
303
                self._run_handlers(KubernatorPlugin.handle_before_script, False, context, plugin_obj, cwd)
5✔
304

305
    def _run_handlers(self, __f, __reverse, __context, __plugin, *args, **kwargs):
5✔
306
        f_name = __f.__name__
5✔
307

308
        def run(h):
5✔
309
            h_f = getattr(h, f_name, None)
5✔
310
            if h_f:
5!
311
                logger.trace("Running %r handler on %r with %r, %r", f_name, h, args, kwargs)
5✔
312
                h_f(*args, **kwargs)
5✔
313

314
        if __plugin:
5✔
315
            __plugin.set_context(__context)
5✔
316
            run(__plugin)
5✔
317
        else:
318
            self._set_plugin_context(__reverse, __context, run)
5✔
319

320
    def _set_plugin_context(self, reverse, context, run):
5✔
321
        for h in list(self.context._plugins if not reverse else reversed(self.context._plugins)):
5✔
322
            h.set_context(context)
5✔
323
            run(h)
5✔
324
            h.set_context(None)
5✔
325

326
    def _exec_ktor(self, ktor_py: Path):
5✔
327
        ktor_py_display_path = self._display_path(ktor_py)
5✔
328
        logger.debug("Executing %s", ktor_py_display_path)
5✔
329
        with open(ktor_py, "rb") as f:
5✔
330
            source = f.read()
5✔
331
        co = compile(source, ktor_py_display_path, "exec")
5✔
332
        globs = {"ktor": self.context,
5✔
333
                 "logger": logger.getChild("script")
334
                 }
335
        exec(co, globs)
5✔
336
        logger.debug("Executed %r", ktor_py_display_path)
5✔
337

338
    def next(self) -> Path:
5✔
339
        path_queue: deque[tuple[PropertyDict, Path]] = self.path_q
5✔
340
        if path_queue:
5✔
341
            self.context, path = path_queue.pop()
5✔
342
            return path
5✔
343

344
    def register_cleanup(self, h):
5✔
345
        if not hasattr(h, "cleanup"):
5!
346
            raise RuntimeError("cleanup handler has no cleanup attribute")
×
347
        self._cleanups.append(h)
5✔
348

349
    def cleanup(self):
5✔
350
        for h in self._cleanups:
5✔
351
            h.cleanup()
5✔
352

353
    def register(self, **kwargs):
5✔
354
        self.discover_plugins()
5✔
355

356
    def handle_init(self):
5✔
357
        context = self.context
5✔
358

359
        context.globals.common = dict()
5✔
360
        context.globals.app = dict(display_path=self._display_path,
5✔
361
                                   args=self.args,
362
                                   repository_credentials_provider=self._repository_credentials_provider,
363
                                   walk_remote=self.walk_remote,
364
                                   walk_local=self.walk_local,
365
                                   register_plugin=self.register_plugin,
366
                                   assert_plugin=self.assert_plugin,
367
                                   config_as_dict=config_as_dict,
368
                                   config_parent=config_parent,
369
                                   download_remote_file=download_remote_file,
370
                                   load_remote_file=load_remote_file,
371
                                   register_cleanup=self.register_cleanup,
372
                                   jp=jp,
373
                                   run=self._run,
374
                                   run_capturing_out=self._run_capturing_out,
375
                                   run_passthrough_capturing=self._run_passthrough_capturing,
376
                                   repository=self.repository,
377
                                   StripNL=StripNL,
378
                                   default_includes=Globs(["*"], True),
379
                                   default_excludes=Globs([".*"], True),
380
                                   )
381
        context.app = dict(_repository_credentials_provider=None,
5✔
382
                           default_includes=Globs(context.app.default_includes),
383
                           default_excludes=Globs(context.app.default_excludes),
384
                           includes=Globs(context.app.default_includes),
385
                           excludes=Globs(context.app.default_excludes),
386
                           )
387

388
    def handle_before_dir(self, cwd: Path):
5✔
389
        context = self.context
5✔
390
        app = context.app
5✔
391
        app.includes = Globs(app.default_includes)
5✔
392
        app.excludes = Globs(app.default_excludes)
5✔
393
        app.cwd = cwd
5✔
394
        self._new_paths = []
5✔
395

396
    def handle_before_script(self, cwd: Path):
5✔
397
        context = self.context
5✔
398
        app = context.app
5✔
399
        app.script = (cwd / ".kubernator.py")
5✔
400

401
    def handle_after_script(self, cwd: Path):
5✔
402
        context = self.context
5✔
403
        app = context.app
5✔
404
        del app.script
5✔
405

406
    def handle_after_dir(self, cwd: Path):
5✔
407
        context = self.context
5✔
408
        app = context.app
5✔
409

410
        for f in scan_dir(logger, cwd, lambda d: d.is_dir(), app.excludes, app.includes):
5✔
411
            self._new_paths.append((PropertyDict(_parent=context), f))
5✔
412

413
        self.path_q.extend(reversed(self._new_paths))
5✔
414

415
        del app.cwd
5✔
416

417
    def repository(self, repo):
5✔
418
        repository = Repository(repo, self._repo_cred_augmentation)
5✔
419
        if repository in self.repos:
5!
420
            repository = self.repos[repository]
×
421
        else:
422
            self.repos[repository] = repository
5✔
423
            repository.init(logger, self.context)
5✔
424
            self.register_cleanup(repository)
5✔
425

426
        return repository
5✔
427

428
    def walk_local(self, *paths: Union[Path, str, bytes], keep_context=False):
5✔
429
        for path in paths:
×
430
            p = Path(path)
×
431
            if not p.is_absolute():
×
432
                p = self.context.app.cwd / p
×
433
            self._add_local(p, keep_context)
×
434

435
    def walk_remote(self, repo, *path_prefixes: Union[Path, str, bytes], keep_context=False):
5✔
436
        repository = self.repository(repo)
5✔
437

438
        if path_prefixes:
5!
439
            for path_prefix in path_prefixes:
×
440
                path = Path(path_prefix)
×
441
                if path.is_absolute():
×
442
                    path = Path(*path.parts[1:])
×
443
                self._add_local(repository.local_dir / path, keep_context)
×
444
        else:
445
            self._add_local(repository.local_dir, keep_context)
5✔
446

447
    def set_context(self, context):
5✔
448
        # We are managing the context for everyone so we don't actually set it anywhere
449
        pass
5✔
450

451
    def _add_local(self, path: Path, keep_context=False):
5✔
452
        logger.info("Adding %s to the plan%s", self._display_path(path),
5✔
453
                    " (in parent context)" if keep_context else "")
454
        self._new_paths.append((self.context if keep_context else PropertyDict(_parent=self.context), path))
5✔
455

456
    def _repository_credentials_provider(self,
5✔
457
                                         provider: Optional[
458
                                             Callable[[urllib.parse.SplitResult], tuple[
459
                                                 Optional[str], Optional[str], Optional[str]]]]):
460
        self.context.app._repository_credentials_provider = provider
×
461

462
    def _repo_cred_augmentation(self, url):
5✔
463
        rcp = self.context.app._repository_credentials_provider
5✔
464
        if not rcp:
5!
465
            return url
5✔
466

467
        scheme, username, password = rcp(url)
×
468
        return urllib.parse.SplitResult(scheme if scheme else url.scheme,
×
469
                                        ((
470
                                             username +
471
                                             (
472
                                                 ":" + password if password else "") + "@"
473
                                             if username else "") + url.hostname)
474
                                        if username or password
475
                                        else url.netloc,
476
                                        url.path, url.query, url.fragment)
477

478
    def _path_to_repository(self, path: Path) -> Repository:
5✔
479
        for r in self.repos.values():
5✔
480
            if path.is_relative_to(r.local_dir):
5!
481
                return r
5✔
482

483
    def _display_path(self, path: Path) -> str:
5✔
484
        repo = self._path_to_repository(path)
5✔
485
        return "<%s> %s" % (repo.url_str, path) if repo else str(path)
5✔
486

487
    def _run(self, *args, **kwargs):
5✔
488
        return run(*args, **kwargs)
5✔
489

490
    def _run_capturing_out(self, *args, **kwargs):
5✔
491
        return run_capturing_out(*args, **kwargs)
5✔
492

493
    def _run_passthrough_capturing(self, *args, **kwargs):
5✔
494
        return run_pass_through_capturing(*args, **kwargs)
5✔
495

496
    def __repr__(self):
5✔
497
        return "Kubernator"
5✔
498

499

500
def clear_cache():
5✔
501
    cache_dir = get_app_cache_dir()
×
502
    _clear_cache("Clearing application cache at %s", cache_dir)
×
503

504

505
def clear_k8s_cache():
5✔
506
    cache_dir = get_cache_dir("python")
5✔
507
    _clear_cache("Clearing Kubernetes Client cache at %s", cache_dir)
5✔
508

509

510
def _clear_cache(msg, cache_dir):
5✔
511
    logger.info(msg, cache_dir)
5✔
512
    if cache_dir.exists():
5!
513
        rmtree(cache_dir)
5✔
514

515

516
def pre_cache_k8s_clients(*versions, disable_patching=False):
5✔
517
    proc_logger = logger.getChild("proc")
5✔
518
    stdout_logger = StripNL(proc_logger.info)
5✔
519
    stderr_logger = StripNL(proc_logger.warning)
5✔
520

521
    for v in versions:
5✔
522
        logger.info("Caching K8S client library ~=v%s.0%s...", v,
5✔
523
                    " (no patches)" if disable_patching else "")
524
        install_python_k8s_client(run_pass_through_capturing, v, logger, stdout_logger, stderr_logger, disable_patching)
5✔
525

526

527
def main():
5✔
528
    argparser = define_arg_parse()
5✔
529
    args = argparser.parse_args()
5✔
530
    if not args.pre_cache_k8s_client and args.pre_cache_k8s_client_no_patch is not None:
5!
531
        argparser.error("--pre-cache-k8s-client-no-patch can only be used with --pre-cache-k8s-client")
×
532

533
    if args.log_file:
5✔
534
        log_stream = open(args.log_file, "w")
5✔
535
    else:
536
        log_stream = sys.stderr
5✔
537

538
    init_logging(args.verbose, args.log_format, log_stream)
5✔
539

540
    try:
5✔
541
        if args.clear_cache:
5!
542
            clear_cache()
×
NEW
543
            return 0
×
544

545
        if args.clear_k8s_cache:
5✔
546
            clear_k8s_cache()
5✔
547
            return 0
5✔
548

549
        if args.pre_cache_k8s_client:
5✔
550
            pre_cache_k8s_clients(*args.pre_cache_k8s_client,
5✔
551
                                  disable_patching=args.pre_cache_k8s_client_no_patch)
552
            return 0
5✔
553

554
        with App(args) as app:
5✔
555
            app.run()
5✔
556
    except SystemExit as e:
5✔
557
        return e.code
×
558
    except Exception as e:
5✔
559
        logger.fatal("Kubernator terminated with an error: %s", e, exc_info=e)
5✔
560
        return 1
5✔
561
    else:
562
        logger.info("Kubernator terminated successfully")
5✔
563
    finally:
564
        try:
5✔
565
            logging.shutdown()
5✔
566
        finally:
567
            if log_stream != sys.stderr:
5✔
568
                with closing(log_stream):
5✔
569
                    pass
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc