• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24637370495

19 Apr 2026 07:35PM UTC coverage: 82.036% (+0.04%) from 81.992%
24637370495

push

github

web-flow
Add project plugin v1: scope, ownership, cleanup (#103)

## Summary

Introduces a **project plugin** that adds hierarchical scope, ownership
tracking, and cleanup to Kubernator. Kubernator today is stateless —
every run reconciles everything it discovers and cannot tell whether a
resource it sees was applied by us, by someone else, or by a previous
run that has since dropped the manifest. Project v1 fixes all three:

- **Scope.** A user registers the project plugin once at the root with a
name; sub-directories extend the project via ``ktor.app.project =
\"<segment>\"`` (dot-joined top-down). CLI flags ``-I``/``-X`` scope a
run to specific sub-trees (repeatable, combineable: ``candidates = known
∩ includes`` then ``in_scope = candidates − excludes``).
- **Ownership.** Every applied resource is stamped with a
``kubernator.io/project`` annotation carrying the composed path. A
single gzipped JSON Secret per top-level project
(``<state_namespace>/kubernator-project-<sha1(root)[:12]>``) records the
idents of resources we own.
- **Cleanup.** On each run, the prior Secret's idents are diffed against
the current in-scope manifests; when ``cleanup=True``, resources that
used to be ours but are no longer in scope get deleted. Resources that
moved between sub-projects within the run's in-scope set are detected
and preserved (dedup on identity, ignoring project).

### Crash resilience

The Secret write is two-phase: before ``handle_apply`` runs, the new
intent is recorded as ``pending`` with ``finalized=false`` while
``resources`` still holds the prior finalized set. After
``handle_cleanup`` succeeds, Finalize commits the merged ``resources``
and clears ``pending`` with ``finalized=true``. A crash between the two
phases leaves ``finalized=false``; the next run's cleanup takes the
conservative union (``resources ∪ pending``) so no previously-owned
resource ever slips past cleanup.

### Concurrent-run protection

A ``coordination.k8s.io/v1.Lease`` named
``kubernator... (continued)

1078 of 1494 branches covered (72.16%)

Branch coverage included in aggregate %.

443 of 561 new or added lines in 5 files covered. (78.97%)

3 existing lines in 1 file now uncovered.

4434 of 5225 relevant lines covered (84.86%)

4.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.75
/src/main/python/kubernator/app.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import argparse
5✔
20
import datetime
5✔
21
import importlib
5✔
22
import logging
5✔
23
import pkgutil
5✔
24
import re
5✔
25
import sys
5✔
26
import urllib.parse
5✔
27
from collections import deque
5✔
28
from collections.abc import MutableMapping, Callable
5✔
29
from contextlib import closing
5✔
30
from pathlib import Path
5✔
31
from shutil import rmtree
5✔
32
from typing import Optional, Union
5✔
33

34
import yaml
5✔
35

36
import kubernator
5✔
37
from kubernator.api import (KubernatorPlugin, Globs, scan_dir, PropertyDict, config_as_dict, config_parent,
5✔
38
                            ContextProperty,
39
                            download_remote_file, load_remote_file, Repository, StripNL, jp, get_app_cache_dir,
40
                            get_cache_dir, install_python_k8s_client)
41
from kubernator.proc import run, run_capturing_out, run_pass_through_capturing
5✔
42

43
TRACE = 5
5✔
44

45

46
def trace(self, msg, *args, **kwargs):
5✔
47
    """
48
    Log 'msg % args' with severity 'TRACE'.
49

50
    To pass exception information, use the keyword argument exc_info with
51
    a true value, e.g.
52

53
    logger.trace("Houston, we have a %s", "interesting problem", exc_info=1)
54
    """
55
    if self.isEnabledFor(TRACE):
5✔
56
        self._log(TRACE, msg, args, **kwargs)
5✔
57

58

59
logging.addLevelName(5, "TRACE")
5✔
60
logging.Logger.trace = trace
5✔
61
logger = logging.getLogger("kubernator")
5✔
62

63
try:
5✔
64
    del (yaml.resolver.Resolver.yaml_implicit_resolvers["="])
5✔
65
except KeyError:
5✔
66
    pass
5✔
67

68

69
_PROJECT_SEGMENT_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
5✔
70

71

72
def define_arg_parse():
5✔
73
    parser = argparse.ArgumentParser(prog="kubernator",
5✔
74
                                     description="Kubernetes Provisioning Tool",
75
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
76
    g = parser.add_mutually_exclusive_group()
5✔
77
    g.add_argument("--version", action="version", version=kubernator.__version__,
5✔
78
                   help="print version and exit")
79
    g.add_argument("--clear-cache", action="store_true",
5✔
80
                   help="clear cache and exit")
81
    g.add_argument("--clear-k8s-cache", action="store_true",
5✔
82
                   help="clear Kubernetes Client cache and exit")
83
    g.add_argument("--pre-cache-k8s-client", action="extend", nargs="+", type=int,
5✔
84
                   help="download specified K8S client library major(!) version(s) and exit")
85
    parser.add_argument("--pre-cache-k8s-client-no-patch", action="store_true", default=None,
5✔
86
                        help="do not patch the k8s client being pre-cached")
87
    parser.add_argument("--log-format", choices=["human", "json"], default="human",
5✔
88
                        help="whether to log for human or machine consumption")
89
    parser.add_argument("--log-file", type=str, default=None,
5✔
90
                        help="where to log, defaults to `stderr`")
91
    parser.add_argument("-v", "--verbose", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"],
5✔
92
                        default="INFO", help="how verbose do you want Kubernator to be")
93
    parser.add_argument("-f", "--file", type=str, default=None,
5✔
94
                        help="where to generate results, if necessary, defaults to `stdout`")
95
    parser.add_argument("-o", "--output-format", choices=["json", "json-pretty", "yaml"], default="yaml",
5✔
96
                        help="in what format to generate results")
97
    parser.add_argument("-p", "--path", dest="path", default=".",
5✔
98
                        type=Path, help="path to start processing")
99
    #    parser.add_argument("--pre-start-script", default=None, type=Path,
100
    #                        help="location of the pre-start script")
101
    #    parser.add_argument("--disconnected", action="store_true", default=False,
102
    #                        help="do not actually connect to the target Kubernetes")
103
    #    parser.add_argument("--k8s-version", type=str, default=None,
104
    #                        help="specify a version of Kubernetes when operating in the disconnected mode")
105
    parser.add_argument("--yes", action="store_false", default=True, dest="dry_run",
5✔
106
                        help="actually make destructive changes")
107
    parser.add_argument("-I", "--include-project", action="append", default=[],
5✔
108
                        metavar="PROJECT", dest="include_project",
109
                        help="limit the run to the named project and its sub-tree "
110
                             "(repeatable; no-op unless the project plugin is registered)")
111
    parser.add_argument("-X", "--exclude-project", action="append", default=[],
5✔
112
                        metavar="PROJECT", dest="exclude_project",
113
                        help="exclude the named project and its sub-tree "
114
                             "(repeatable; no-op unless the project plugin is registered)")
115
    parser.add_argument("command", nargs="?", choices=["dump", "apply"], default="dump",
5✔
116
                        help="whether to dump the proposed changes to the output or to apply them")
117
    return parser
5✔
118

119

120
def init_logging(verbose, output_format, output_file):
5✔
121
    root_log = logging.root
5✔
122

123
    handler = logging.StreamHandler(output_file)
5✔
124
    root_log.addHandler(handler)
5✔
125

126
    if output_format == "human":
5✔
127
        if handler.stream.isatty():
5!
128
            import coloredlogs
×
129
            fmt_cls = coloredlogs.ColoredFormatter
×
130

131
        else:
132
            fmt_cls = logging.Formatter
5✔
133

134
        def formatTime(record, datefmt=None):
5✔
135
            return datetime.datetime.fromtimestamp(record.created).isoformat()
5✔
136

137
        formatter = fmt_cls("%(asctime)s %(name)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
5✔
138
        formatter.formatTime = formatTime
5✔
139
    else:
140
        import json_log_formatter
5✔
141

142
        class JSONFormatter(json_log_formatter.JSONFormatter):
5✔
143
            def json_record(self, message, extra, record: logging.LogRecord):
5✔
144
                extra = super(JSONFormatter, self).json_record(message, extra, record)
5✔
145
                extra["ts"] = datetime.datetime.fromtimestamp(record.created)
5✔
146
                extra["name"] = record.name
5✔
147
                extra["level"] = record.levelname
5✔
148
                extra["fn"] = record.filename
5✔
149
                extra["ln"] = record.lineno
5✔
150
                del extra["time"]
5✔
151
                return extra
5✔
152

153
        formatter = JSONFormatter()
5✔
154

155
    handler.setFormatter(formatter)
5✔
156
    logger.setLevel(logging._nameToLevel[verbose])
5✔
157

158

159
class App(KubernatorPlugin):
5✔
160
    _name = "app"
5✔
161

162
    def __init__(self, args):
5✔
163
        self.args = args
5✔
164
        path = args.path.absolute()
5✔
165

166
        global_context = PropertyDict()
5✔
167
        global_context.globals = global_context
5✔
168
        context = PropertyDict(_parent=global_context)
5✔
169
        context._plugins = []
5✔
170

171
        self._top_level_context = context
5✔
172
        self.context = context
5✔
173
        self._top_dir_context = PropertyDict(_parent=self.context)
5✔
174

175
        self.repos: MutableMapping[Repository, Repository] = dict()
5✔
176
        self.path_q: deque[tuple[PropertyDict, Path]] = deque(((self._top_dir_context, path),))
5✔
177

178
        self._new_paths: list[tuple[PropertyDict, Path]] = []
5✔
179

180
        self._cleanups = []
5✔
181
        self._plugin_types = {}
5✔
182
        self._project_segments: dict[PropertyDict, str] = {}
5✔
183

184
    def __enter__(self):
5✔
185
        return self
5✔
186

187
    def __exit__(self, exc_type, exc_val, exc_tb):
5✔
188
        self.cleanup()
5✔
189

190
    def run(self):
5✔
191
        logger.info("Starting Kubernator version %s", kubernator.__version__)
5✔
192

193
        self.register_plugin(self)
5✔
194

195
        try:
5✔
196
            try:
5✔
197
                while True:
5✔
198
                    cwd = self.next()
5✔
199
                    if not cwd:
5✔
200
                        logger.debug("No paths left to traverse")
5✔
201
                        break
5✔
202

203
                    context = self.context
5✔
204

205
                    logger.debug("Inspecting directory %s", self._display_path(cwd))
5✔
206
                    self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, None, cwd)
5✔
207

208
                    if (ktor_py := (cwd / ".kubernator.py")).exists():
5✔
209
                        self._run_handlers(KubernatorPlugin.handle_before_script, False, context, None, cwd)
5✔
210

211
                        for h in self.context._plugins:
5✔
212
                            h.set_context(context)
5✔
213

214
                        self._exec_ktor(ktor_py)
5✔
215

216
                        for h in self.context._plugins:
5✔
217
                            h.set_context(None)
5✔
218

219
                        self._run_handlers(KubernatorPlugin.handle_after_script, True, context, None, cwd)
5✔
220

221
                    self._run_handlers(KubernatorPlugin.handle_after_dir, True, context, None, cwd)
5✔
222

223
                self.context = self._top_dir_context
5✔
224
                context = self.context
5✔
225

226
                self._run_handlers(KubernatorPlugin.handle_apply, True, context, None)
5✔
227

228
                self._run_handlers(KubernatorPlugin.handle_verify, True, context, None)
5✔
229

230
                # Cleanup runs after verify so a verify failure prevents cleanup.
231
                self._run_handlers(KubernatorPlugin.handle_cleanup, True, context, None)
5✔
232
            finally:
233
                self.context = self._top_dir_context
5✔
234
                context = self.context
5✔
235
                self._run_handlers(KubernatorPlugin.handle_shutdown, True, context, None)
5✔
236
        except:  # noqa E722
5✔
237
            raise
5✔
238
        else:
239
            self.context = self._top_dir_context
5✔
240
            context = self.context
5✔
241
            self._run_handlers(KubernatorPlugin.handle_summary, True, context, None)
5✔
242

243
    def discover_plugins(self):
5✔
244
        importlib.invalidate_caches()
5✔
245
        search_path = Path(kubernator.__path__[0], "plugins")
5✔
246
        [importlib.import_module(name)
5✔
247
         for finder, name, is_pkg in
248
         pkgutil.iter_modules([str(search_path)], "kubernator.plugins.")]
249

250
        for plugin in KubernatorPlugin.__subclasses__():
5✔
251
            if plugin._name in self._plugin_types:
5!
252
                logger.warning("Plugin named %r in %r is already reserved by %r and will be ignored",
×
253
                               plugin._name,
254
                               plugin,
255
                               self._plugin_types[plugin._name])
256
            else:
257
                logger.info("Plugin %r discovered in %r", plugin._name, plugin)
5✔
258
                self._plugin_types[plugin._name] = plugin
5✔
259

260
    def assert_plugin(self, plugin: Union[KubernatorPlugin, type[KubernatorPlugin], str],
5✔
261
                      requester: Union[KubernatorPlugin, type[KubernatorPlugin], str]):
262
        context = self.context
5✔
263
        if isinstance(plugin, str):
5!
264
            try:
5✔
265
                plugin_type = self._plugin_types[plugin]
5✔
266
            except KeyError:
×
267
                logger.critical("No known plugin with the name %r", plugin)
×
268
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
269
        elif isinstance(plugin, type):
×
270
            plugin_type = plugin
×
271
        else:
272
            plugin_type = type(plugin)
×
273

274
        for p in context._plugins:
5!
275
            if p._name == plugin_type._name:
5✔
276
                return
5✔
277

278
        raise RuntimeError("Plugin %s requires plugin %s to be initialized",
×
279
                           requester if hasattr(requester, "_name") else requester,
280
                           plugin)
281

282
    def register_plugin(self, plugin: Union[KubernatorPlugin, type, str], **kwargs):
5✔
283
        context = self.context
5✔
284
        if isinstance(plugin, str):
5✔
285
            try:
5✔
286
                plugin_obj = self._plugin_types[plugin]()
5✔
287
            except KeyError:
×
288
                logger.critical("No known plugin with the name %r", plugin)
×
289
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
290
        elif isinstance(plugin, type):
5!
291
            plugin_obj = plugin()
×
292
        else:
293
            plugin_obj = plugin
5✔
294

295
        for p in context._plugins:
5✔
296
            if p._name == plugin_obj._name:
5✔
297
                logger.info("Plugin with name %r already registered, skipping", p._name)
5✔
298
                return
5✔
299

300
        logger.info("Registering plugin %r via %r", plugin_obj._name, plugin_obj)
5✔
301

302
        # Register
303
        self._run_handlers(KubernatorPlugin.register, False, context, plugin_obj, **kwargs)
5✔
304

305
        context._plugins.append(plugin_obj)
5✔
306

307
        # Init
308
        self._run_handlers(KubernatorPlugin.handle_init, False, context, plugin_obj)
5✔
309

310
        # Start
311
        self._run_handlers(KubernatorPlugin.handle_start, False, context, plugin_obj)
5✔
312

313
        # If we're already processing a directory
314
        if "app" in self.context and "cwd" in self.context.app and self.context.app.cwd:
5✔
315
            cwd = self.context.app.cwd
5✔
316
            self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, plugin_obj, cwd)
5✔
317

318
            # If we're already in the script (TODO: is it possible to NOT be in a script?)
319
            if "script" in self.context.app:
5!
320
                self._run_handlers(KubernatorPlugin.handle_before_script, False, context, plugin_obj, cwd)
5✔
321

322
    def _run_handlers(self, __f, __reverse, __context, __plugin, *args, **kwargs):
5✔
323
        f_name = __f.__name__
5✔
324

325
        def run(h):
5✔
326
            h_f = getattr(h, f_name, None)
5✔
327
            if h_f:
5!
328
                logger.trace("Running %r handler on %r with %r, %r", f_name, h, args, kwargs)
5✔
329
                h_f(*args, **kwargs)
5✔
330

331
        if __plugin:
5✔
332
            __plugin.set_context(__context)
5✔
333
            run(__plugin)
5✔
334
        else:
335
            self._dispatch_to_all_plugins(__reverse, __context, run)
5✔
336

337
    def _dispatch_to_all_plugins(self, reverse, context, run):
5✔
338
        plugins = list(self.context._plugins if not reverse else reversed(self.context._plugins))
5✔
339
        for h in plugins:
5✔
340
            h.set_context(context)
5✔
341
        try:
5✔
342
            for h in plugins:
5✔
343
                run(h)
5✔
344
        finally:
345
            for h in plugins:
5✔
346
                h.set_context(None)
5✔
347

348
    def _exec_ktor(self, ktor_py: Path):
5✔
349
        ktor_py_display_path = self._display_path(ktor_py)
5✔
350
        logger.debug("Executing %s", ktor_py_display_path)
5✔
351
        with open(ktor_py, "rb") as f:
5✔
352
            source = f.read()
5✔
353
        co = compile(source, ktor_py_display_path, "exec")
5✔
354
        globs = {"ktor": self.context,
5✔
355
                 "logger": logger.getChild("script")
356
                 }
357
        exec(co, globs)
5✔
358
        logger.debug("Executed %r", ktor_py_display_path)
5✔
359

360
    def next(self) -> Path:
5✔
361
        path_queue: deque[tuple[PropertyDict, Path]] = self.path_q
5✔
362
        if path_queue:
5✔
363
            self.context, path = path_queue.pop()
5✔
364
            return path
5✔
365

366
    def register_cleanup(self, h):
5✔
367
        if not hasattr(h, "cleanup"):
5!
368
            raise RuntimeError("cleanup handler has no cleanup attribute")
×
369
        self._cleanups.append(h)
5✔
370

371
    def cleanup(self):
5✔
372
        for h in self._cleanups:
5✔
373
            h.cleanup()
5✔
374

375
    def register(self, **kwargs):
5✔
376
        self.discover_plugins()
5✔
377

378
    def handle_init(self):
5✔
379
        context = self.context
5✔
380

381
        context.globals.common = dict()
5✔
382
        context.globals.app = dict(display_path=self._display_path,
5✔
383
                                   args=self.args,
384
                                   repository_credentials_provider=self._repository_credentials_provider,
385
                                   walk_remote=self.walk_remote,
386
                                   walk_local=self.walk_local,
387
                                   register_plugin=self.register_plugin,
388
                                   assert_plugin=self.assert_plugin,
389
                                   config_as_dict=config_as_dict,
390
                                   config_parent=config_parent,
391
                                   download_remote_file=download_remote_file,
392
                                   load_remote_file=load_remote_file,
393
                                   register_cleanup=self.register_cleanup,
394
                                   jp=jp,
395
                                   run=self._run,
396
                                   run_capturing_out=self._run_capturing_out,
397
                                   run_passthrough_capturing=self._run_passthrough_capturing,
398
                                   repository=self.repository,
399
                                   StripNL=StripNL,
400
                                   default_includes=Globs(["*"], True),
401
                                   default_excludes=Globs([".*"], True),
402
                                   )
403
        context.globals.app.project = ContextProperty(self._get_project, self._set_project)
5✔
404

405
        context.app = dict(_repository_credentials_provider=None,
5✔
406
                           default_includes=Globs(context.app.default_includes),
407
                           default_excludes=Globs(context.app.default_excludes),
408
                           includes=Globs(context.app.default_includes),
409
                           excludes=Globs(context.app.default_excludes),
410
                           )
411

412
    def _get_project(self, origin):
5✔
413
        segs = []
5✔
414
        cur = self.context
5✔
415
        while cur is not None:
5✔
416
            seg = self._project_segments.get(cur)
5✔
417
            if seg is not None:
5✔
418
                segs.append(seg)
5✔
419
            cur = config_parent(cur)
5✔
420
        if not segs:
5!
NEW
421
            return None
×
422
        segs.reverse()
5✔
423
        return ".".join(segs)
5✔
424

425
    def _set_project(self, origin, value):
5✔
426
        if not isinstance(value, str) or not _PROJECT_SEGMENT_RE.match(value):
5✔
427
            raise ValueError("invalid project segment %r" % (value,))
5✔
428
        ctx = self.context
5✔
429
        if ctx in self._project_segments:
5✔
430
            raise ValueError("project already set to %r in this context" %
5✔
431
                             (self._project_segments[ctx],))
432
        self._project_segments[ctx] = value
5✔
433

434
    def handle_before_dir(self, cwd: Path):
5✔
435
        context = self.context
5✔
436
        app = context.app
5✔
437
        app.includes = Globs(app.default_includes)
5✔
438
        app.excludes = Globs(app.default_excludes)
5✔
439
        app.cwd = cwd
5✔
440
        self._new_paths = []
5✔
441

442
    def handle_before_script(self, cwd: Path):
5✔
443
        context = self.context
5✔
444
        app = context.app
5✔
445
        app.script = (cwd / ".kubernator.py")
5✔
446

447
    def handle_after_script(self, cwd: Path):
5✔
448
        context = self.context
5✔
449
        app = context.app
5✔
450
        del app.script
5✔
451

452
    def handle_after_dir(self, cwd: Path):
5✔
453
        context = self.context
5✔
454
        app = context.app
5✔
455

456
        for f in scan_dir(logger, cwd, lambda d: d.is_dir(), app.excludes, app.includes):
5✔
457
            self._new_paths.append((PropertyDict(_parent=context), f))
5✔
458

459
        self.path_q.extend(reversed(self._new_paths))
5✔
460

461
        del app.cwd
5✔
462

463
    def handle_summary(self):
5✔
464
        if "project" in self.context.globals:
5✔
465
            return
5✔
466
        args = self.args
5✔
467
        unused = ["%s %s" % (flag, val)
5✔
468
                  for flag, val in (("-I/--include-project", args.include_project),
469
                                    ("-X/--exclude-project", args.exclude_project))
470
                  if val]
471
        if unused:
5!
NEW
472
            logger.warning(
×
473
                "Project scoping flag(s) %s had no effect because the project "
474
                "plugin was not registered in this run.", ", ".join(unused))
475

476
    def repository(self, repo):
5✔
477
        repository = Repository(repo, self._repo_cred_augmentation)
5✔
478
        if repository in self.repos:
5!
479
            repository = self.repos[repository]
×
480
        else:
481
            self.repos[repository] = repository
5✔
482
            repository.init(logger, self.context)
5✔
483
            self.register_cleanup(repository)
5✔
484

485
        return repository
5✔
486

487
    def walk_local(self, *paths: Union[Path, str, bytes], keep_context=False):
5✔
488
        for path in paths:
×
489
            p = Path(path)
×
490
            if not p.is_absolute():
×
491
                p = self.context.app.cwd / p
×
492
            self._add_local(p, keep_context)
×
493

494
    def walk_remote(self, repo, *path_prefixes: Union[Path, str, bytes], keep_context=False):
5✔
495
        repository = self.repository(repo)
5✔
496

497
        if path_prefixes:
5!
498
            for path_prefix in path_prefixes:
×
499
                path = Path(path_prefix)
×
500
                if path.is_absolute():
×
501
                    path = Path(*path.parts[1:])
×
502
                self._add_local(repository.local_dir / path, keep_context)
×
503
        else:
504
            self._add_local(repository.local_dir, keep_context)
5✔
505

506
    def set_context(self, context):
5✔
507
        # We are managing the context for everyone so we don't actually set it anywhere
508
        pass
5✔
509

510
    def _add_local(self, path: Path, keep_context=False):
5✔
511
        logger.info("Adding %s to the plan%s", self._display_path(path),
5✔
512
                    " (in parent context)" if keep_context else "")
513
        self._new_paths.append((self.context if keep_context else PropertyDict(_parent=self.context), path))
5✔
514

515
    def _repository_credentials_provider(self,
5✔
516
                                         provider: Optional[
517
                                             Callable[[urllib.parse.SplitResult], tuple[
518
                                                 Optional[str], Optional[str], Optional[str]]]]):
519
        self.context.app._repository_credentials_provider = provider
×
520

521
    def _repo_cred_augmentation(self, url):
5✔
522
        rcp = self.context.app._repository_credentials_provider
5✔
523
        if not rcp:
5!
524
            return url
5✔
525

526
        scheme, username, password = rcp(url)
×
527
        return urllib.parse.SplitResult(scheme if scheme else url.scheme,
×
528
                                        ((
529
                                             username +
530
                                             (
531
                                                 ":" + password if password else "") + "@"
532
                                             if username else "") + url.hostname)
533
                                        if username or password
534
                                        else url.netloc,
535
                                        url.path, url.query, url.fragment)
536

537
    def _path_to_repository(self, path: Path) -> Repository:
5✔
538
        for r in self.repos.values():
5✔
539
            if path.is_relative_to(r.local_dir):
5!
540
                return r
5✔
541

542
    def _display_path(self, path: Path) -> str:
5✔
543
        repo = self._path_to_repository(path)
5✔
544
        return "<%s> %s" % (repo.url_str, path) if repo else str(path)
5✔
545

546
    def _run(self, *args, **kwargs):
5✔
547
        return run(*args, **kwargs)
5✔
548

549
    def _run_capturing_out(self, *args, **kwargs):
5✔
550
        return run_capturing_out(*args, **kwargs)
5✔
551

552
    def _run_passthrough_capturing(self, *args, **kwargs):
5✔
553
        return run_pass_through_capturing(*args, **kwargs)
5✔
554

555
    def __repr__(self):
5✔
556
        return "Kubernator"
5✔
557

558

559
def clear_cache():
5✔
560
    cache_dir = get_app_cache_dir()
×
561
    _clear_cache("Clearing application cache at %s", cache_dir)
×
562

563

564
def clear_k8s_cache():
5✔
565
    cache_dir = get_cache_dir("python")
5✔
566
    _clear_cache("Clearing Kubernetes Client cache at %s", cache_dir)
5✔
567

568

569
def _clear_cache(msg, cache_dir):
5✔
570
    logger.info(msg, cache_dir)
5✔
571
    if cache_dir.exists():
5!
572
        rmtree(cache_dir)
5✔
573

574

575
def pre_cache_k8s_clients(*versions, disable_patching=False):
5✔
576
    proc_logger = logger.getChild("proc")
5✔
577
    stdout_logger = StripNL(proc_logger.info)
5✔
578
    stderr_logger = StripNL(proc_logger.warning)
5✔
579

580
    for v in versions:
5✔
581
        logger.info("Caching K8S client library ~=v%s.0%s...", v,
5✔
582
                    " (no patches)" if disable_patching else "")
583
        install_python_k8s_client(run_pass_through_capturing, v, logger, stdout_logger, stderr_logger, disable_patching)
5✔
584

585

586
def main():
5✔
587
    argparser = define_arg_parse()
5✔
588
    args = argparser.parse_args()
5✔
589
    if not args.pre_cache_k8s_client and args.pre_cache_k8s_client_no_patch is not None:
5!
590
        argparser.error("--pre-cache-k8s-client-no-patch can only be used with --pre-cache-k8s-client")
×
591

592
    if args.log_file:
5✔
593
        log_stream = open(args.log_file, "w")
5✔
594
    else:
595
        log_stream = sys.stderr
5✔
596

597
    init_logging(args.verbose, args.log_format, log_stream)
5✔
598

599
    try:
5✔
600
        if args.clear_cache:
5!
601
            clear_cache()
×
602
            return 0
×
603

604
        if args.clear_k8s_cache:
5✔
605
            clear_k8s_cache()
5✔
606
            return 0
5✔
607

608
        if args.pre_cache_k8s_client:
5✔
609
            pre_cache_k8s_clients(*args.pre_cache_k8s_client,
5✔
610
                                  disable_patching=args.pre_cache_k8s_client_no_patch)
611
            return 0
5✔
612

613
        with App(args) as app:
5✔
614
            app.run()
5✔
615
    except SystemExit as e:
5✔
616
        return e.code
×
617
    except Exception as e:
5✔
618
        logger.fatal("Kubernator terminated with an error: %s", e, exc_info=e)
5✔
619
        return 1
5✔
620
    else:
621
        logger.info("Kubernator terminated successfully")
5✔
622
    finally:
623
        try:
5✔
624
            logging.shutdown()
5✔
625
        finally:
626
            if log_stream != sys.stderr:
5✔
627
                with closing(log_stream):
5✔
628
                    pass
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc