• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 19252312831

11 Nov 2025 01:54AM UTC coverage: 77.467% (+2.1%) from 75.318%
19252312831

Pull #92

github

web-flow
Merge 34555e628 into 1ca3f8432
Pull Request #92: Add assert_plugin to augment register_plugin to indicate dependency

654 of 1001 branches covered (65.33%)

Branch coverage included in aggregate %.

10 of 17 new or added lines in 3 files covered. (58.82%)

13 existing lines in 2 files now uncovered.

2526 of 3104 relevant lines covered (81.38%)

3.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.84
/src/main/python/kubernator/app.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import argparse
4✔
20
import datetime
4✔
21
import importlib
4✔
22
import logging
4✔
23
import pkgutil
4✔
24
import sys
4✔
25
import urllib.parse
4✔
26
from collections import deque
4✔
27
from collections.abc import MutableMapping, Callable
4✔
28
from pathlib import Path
4✔
29
from shutil import rmtree
4✔
30
from typing import Optional, Union
4✔
31

32
import yaml
4✔
33

34
import kubernator
4✔
35
from kubernator.api import (KubernatorPlugin, Globs, scan_dir, PropertyDict, config_as_dict, config_parent,
4✔
36
                            download_remote_file, load_remote_file, Repository, StripNL, jp, get_app_cache_dir,
37
                            get_cache_dir, install_python_k8s_client)
38
from kubernator.proc import run, run_capturing_out, run_pass_through_capturing
4✔
39

40
TRACE = 5
4✔
41

42

43
def trace(self, msg, *args, **kwargs):
4✔
44
    """
45
    Log 'msg % args' with severity 'TRACE'.
46

47
    To pass exception information, use the keyword argument exc_info with
48
    a true value, e.g.
49

50
    logger.trace("Houston, we have a %s", "interesting problem", exc_info=1)
51
    """
52
    if self.isEnabledFor(TRACE):
4✔
53
        self._log(TRACE, msg, args, **kwargs)
4✔
54

55

56
logging.addLevelName(5, "TRACE")
4✔
57
logging.Logger.trace = trace
4✔
58
logger = logging.getLogger("kubernator")
4✔
59

60
try:
4✔
61
    del (yaml.resolver.Resolver.yaml_implicit_resolvers["="])
4✔
62
except KeyError:
4✔
63
    pass
4✔
64

65

66
def define_arg_parse():
4✔
67
    parser = argparse.ArgumentParser(prog="kubernator",
4✔
68
                                     description="Kubernetes Provisioning Tool",
69
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
70
    g = parser.add_mutually_exclusive_group()
4✔
71
    g.add_argument("--version", action="version", version=kubernator.__version__,
4✔
72
                   help="print version and exit")
73
    g.add_argument("--clear-cache", action="store_true",
4✔
74
                   help="clear cache and exit")
75
    g.add_argument("--clear-k8s-cache", action="store_true",
4✔
76
                   help="clear Kubernetes Client cache and exit")
77
    g.add_argument("--pre-cache-k8s-client", action="extend", nargs="+", type=int,
4✔
78
                   help="download specified K8S client library major(!) version(s) and exit")
79
    parser.add_argument("--pre-cache-k8s-client-no-patch", action="store_true", default=None,
4✔
80
                        help="do not patch the k8s client being pre-cached")
81
    parser.add_argument("--log-format", choices=["human", "json"], default="human",
4✔
82
                        help="whether to log for human or machine consumption")
83
    parser.add_argument("--log-file", type=argparse.FileType("w"), default=None,
4✔
84
                        help="where to log, defaults to `stderr`")
85
    parser.add_argument("-v", "--verbose", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"],
4✔
86
                        default="INFO", help="how verbose do you want Kubernator to be")
87
    parser.add_argument("-f", "--file", type=argparse.FileType("w"), default=sys.stdout,
4✔
88
                        help="where to generate results, if necessary")
89
    parser.add_argument("-o", "--output-format", choices=["json", "json-pretty", "yaml"], default="yaml",
4✔
90
                        help="in what format to generate results")
91
    parser.add_argument("-p", "--path", dest="path", default=".",
4✔
92
                        type=Path, help="path to start processing")
93
    #    parser.add_argument("--pre-start-script", default=None, type=Path,
94
    #                        help="location of the pre-start script")
95
    #    parser.add_argument("--disconnected", action="store_true", default=False,
96
    #                        help="do not actually connect to the target Kubernetes")
97
    #    parser.add_argument("--k8s-version", type=str, default=None,
98
    #                        help="specify a version of Kubernetes when operating in the disconnected mode")
99
    parser.add_argument("--yes", action="store_false", default=True, dest="dry_run",
4✔
100
                        help="actually make destructive changes")
101
    parser.add_argument("command", nargs="?", choices=["dump", "apply"], default="dump",
4✔
102
                        help="whether to dump the proposed changes to the output or to apply them")
103
    return parser
4✔
104

105

106
def init_logging(verbose, output_format, output_file):
4✔
107
    root_log = logging.root
4✔
108

109
    handler = logging.StreamHandler(output_file)
4✔
110
    root_log.addHandler(handler)
4✔
111

112
    if output_format == "human":
4✔
113
        if handler.stream.isatty():
4!
114
            import coloredlogs
×
115
            fmt_cls = coloredlogs.ColoredFormatter
×
116

117
        else:
118
            fmt_cls = logging.Formatter
4✔
119

120
        def formatTime(record, datefmt=None):
4✔
121
            return datetime.datetime.fromtimestamp(record.created).isoformat()
4✔
122

123
        formatter = fmt_cls("%(asctime)s %(name)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
4✔
124
        formatter.formatTime = formatTime
4✔
125
    else:
126
        import json_log_formatter
4✔
127

128
        class JSONFormatter(json_log_formatter.JSONFormatter):
4✔
129
            def json_record(self, message, extra, record: logging.LogRecord):
4✔
130
                extra = super(JSONFormatter, self).json_record(message, extra, record)
4✔
131
                extra["ts"] = datetime.datetime.fromtimestamp(record.created)
4✔
132
                extra["name"] = record.name
4✔
133
                extra["level"] = record.levelname
4✔
134
                extra["fn"] = record.filename
4✔
135
                extra["ln"] = record.lineno
4✔
136
                del extra["time"]
4✔
137
                return extra
4✔
138

139
        formatter = JSONFormatter()
4✔
140

141
    handler.setFormatter(formatter)
4✔
142
    logger.setLevel(logging._nameToLevel[verbose])
4✔
143

144

145
class App(KubernatorPlugin):
4✔
146
    _name = "app"
4✔
147

148
    def __init__(self, args):
4✔
149
        self.args = args
4✔
150
        path = args.path.absolute()
4✔
151

152
        global_context = PropertyDict()
4✔
153
        global_context.globals = global_context
4✔
154
        context = PropertyDict(_parent=global_context)
4✔
155
        context._plugins = []
4✔
156

157
        self._top_level_context = context
4✔
158
        self.context = context
4✔
159
        self._top_dir_context = PropertyDict(_parent=self.context)
4✔
160

161
        self.repos: MutableMapping[Repository, Repository] = dict()
4✔
162
        self.path_q: deque[tuple[PropertyDict, Path]] = deque(((self._top_dir_context, path),))
4✔
163

164
        self._new_paths: list[tuple[PropertyDict, Path]] = []
4✔
165

166
        self._cleanups = []
4✔
167
        self._plugin_types = {}
4✔
168

169
    def __enter__(self):
4✔
170
        return self
4✔
171

172
    def __exit__(self, exc_type, exc_val, exc_tb):
4✔
173
        self.cleanup()
4✔
174

175
    def run(self):
4✔
176
        logger.info("Starting Kubernator version %s", kubernator.__version__)
4✔
177

178
        self.register_plugin(self)
4✔
179

180
        try:
4✔
181
            try:
4✔
182
                while True:
3✔
183
                    cwd = self.next()
4✔
184
                    if not cwd:
4✔
185
                        logger.debug("No paths left to traverse")
4✔
186
                        break
4✔
187

188
                    context = self.context
4✔
189

190
                    logger.debug("Inspecting directory %s", self._display_path(cwd))
4✔
191
                    self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, None, cwd)
4✔
192

193
                    if (ktor_py := (cwd / ".kubernator.py")).exists():
4✔
194
                        self._run_handlers(KubernatorPlugin.handle_before_script, False, context, None, cwd)
4✔
195

196
                        for h in self.context._plugins:
4✔
197
                            h.set_context(context)
4✔
198

199
                        self._exec_ktor(ktor_py)
4✔
200

201
                        for h in self.context._plugins:
4✔
202
                            h.set_context(None)
4✔
203

204
                        self._run_handlers(KubernatorPlugin.handle_after_script, True, context, None, cwd)
4✔
205

206
                    self._run_handlers(KubernatorPlugin.handle_after_dir, True, context, None, cwd)
4✔
207

208
                self.context = self._top_dir_context
4✔
209
                context = self.context
4✔
210

211
                self._run_handlers(KubernatorPlugin.handle_apply, True, context, None)
4✔
212

213
                self._run_handlers(KubernatorPlugin.handle_verify, True, context, None)
4✔
214
            finally:
215
                self.context = self._top_dir_context
4✔
216
                context = self.context
4✔
217
                self._run_handlers(KubernatorPlugin.handle_shutdown, True, context, None)
4✔
218
        except:  # noqa E722
4✔
219
            raise
4✔
220
        else:
221
            self.context = self._top_dir_context
4✔
222
            context = self.context
4✔
223
            self._run_handlers(KubernatorPlugin.handle_summary, True, context, None)
4✔
224

225
    def discover_plugins(self):
4✔
226
        importlib.invalidate_caches()
4✔
227
        search_path = Path(kubernator.__path__[0], "plugins")
4✔
228
        [importlib.import_module(name)
4✔
229
         for finder, name, is_pkg in
230
         pkgutil.iter_modules([str(search_path)], "kubernator.plugins.")]
231

232
        for plugin in KubernatorPlugin.__subclasses__():
4✔
233
            if plugin._name in self._plugin_types:
4!
234
                logger.warning("Plugin named %r in %r is already reserved by %r and will be ignored",
×
235
                               plugin._name,
236
                               plugin,
237
                               self._plugin_types[plugin._name])
238
            else:
239
                logger.info("Plugin %r discovered in %r", plugin._name, plugin)
4✔
240
                self._plugin_types[plugin._name] = plugin
4✔
241

242
    def assert_plugin(self, plugin: Union[KubernatorPlugin, type[KubernatorPlugin], str],
4✔
243
                      requester: Union[KubernatorPlugin, type[KubernatorPlugin], str]):
244
        context = self.context
4✔
245
        if isinstance(plugin, str):
4!
246
            try:
4✔
247
                plugin_type = self._plugin_types[plugin]
4✔
NEW
248
            except KeyError:
×
NEW
249
                logger.critical("No known plugin with the name %r", plugin)
×
NEW
250
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
NEW
251
        elif isinstance(plugin, type):
×
NEW
252
            plugin_type = plugin
×
253
        else:
NEW
254
            plugin_type = type(plugin)
×
255

256
        for p in context._plugins:
4!
257
            if p._name == plugin_type._name:
4✔
258
                return
4✔
259

NEW
260
        raise RuntimeError("Plugin %s requires plugin %s to be initialized",
×
261
                           requester if hasattr(requester, "_name") else requester,
262
                           plugin)
263

264
    def register_plugin(self, plugin: Union[KubernatorPlugin, type, str], **kwargs):
4✔
265
        context = self.context
4✔
266
        if isinstance(plugin, str):
4✔
267
            try:
4✔
268
                plugin_obj = self._plugin_types[plugin]()
4✔
269
            except KeyError:
×
270
                logger.critical("No known plugin with the name %r", plugin)
×
271
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
272
        elif isinstance(plugin, type):
4!
273
            plugin_obj = plugin()
×
274
        else:
275
            plugin_obj = plugin
4✔
276

277
        for p in context._plugins:
4✔
278
            if p._name == plugin_obj._name:
4✔
279
                logger.info("Plugin with name %r already registered, skipping", p._name)
4✔
280
                return
4✔
281

282
        logger.info("Registering plugin %r via %r", plugin_obj._name, plugin_obj)
4✔
283

284
        # Register
285
        self._run_handlers(KubernatorPlugin.register, False, context, plugin_obj, **kwargs)
4✔
286

287
        context._plugins.append(plugin_obj)
4✔
288

289
        # Init
290
        self._run_handlers(KubernatorPlugin.handle_init, False, context, plugin_obj)
4✔
291

292
        # Start
293
        self._run_handlers(KubernatorPlugin.handle_start, False, context, plugin_obj)
4✔
294

295
        # If we're already processing a directory
296
        if "app" in self.context and "cwd" in self.context.app and self.context.app.cwd:
4✔
297
            cwd = self.context.app.cwd
4✔
298
            self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, plugin_obj, cwd)
4✔
299

300
            # If we're already in the script (TODO: is it possible to NOT be in a script?)
301
            if "script" in self.context.app:
4!
302
                self._run_handlers(KubernatorPlugin.handle_before_script, False, context, plugin_obj, cwd)
4✔
303

304
    def _run_handlers(self, __f, __reverse, __context, __plugin, *args, **kwargs):
4✔
305
        f_name = __f.__name__
4✔
306

307
        def run(h):
4✔
308
            h_f = getattr(h, f_name, None)
4✔
309
            if h_f:
4!
310
                logger.trace("Running %r handler on %r with %r, %r", f_name, h, args, kwargs)
4✔
311
                h_f(*args, **kwargs)
4✔
312

313
        if __plugin:
4✔
314
            __plugin.set_context(__context)
4✔
315
            run(__plugin)
4✔
316
        else:
317
            self._set_plugin_context(__reverse, __context, run)
4✔
318

319
    def _set_plugin_context(self, reverse, context, run):
4✔
320
        for h in list(self.context._plugins if not reverse else reversed(self.context._plugins)):
4✔
321
            h.set_context(context)
4✔
322
            run(h)
4✔
323
            h.set_context(None)
4✔
324

325
    def _exec_ktor(self, ktor_py: Path):
4✔
326
        ktor_py_display_path = self._display_path(ktor_py)
4✔
327
        logger.debug("Executing %s", ktor_py_display_path)
4✔
328
        with open(ktor_py, "rb") as f:
4✔
329
            source = f.read()
4✔
330
        co = compile(source, ktor_py_display_path, "exec")
4✔
331
        globs = {"ktor": self.context,
4✔
332
                 "logger": logger.getChild("script")
333
                 }
334
        exec(co, globs)
4✔
335
        logger.debug("Executed %r", ktor_py_display_path)
4✔
336

337
    def next(self) -> Path:
4✔
338
        path_queue: deque[tuple[PropertyDict, Path]] = self.path_q
4✔
339
        if path_queue:
4✔
340
            self.context, path = path_queue.pop()
4✔
341
            return path
4✔
342

343
    def register_cleanup(self, h):
4✔
344
        if not hasattr(h, "cleanup"):
4!
345
            raise RuntimeError("cleanup handler has no cleanup attribute")
×
346
        self._cleanups.append(h)
4✔
347

348
    def cleanup(self):
4✔
349
        for h in self._cleanups:
4✔
350
            h.cleanup()
4✔
351

352
    def register(self, **kwargs):
4✔
353
        self.discover_plugins()
4✔
354

355
    def handle_init(self):
4✔
356
        context = self.context
4✔
357

358
        context.globals.common = dict()
4✔
359
        context.globals.app = dict(display_path=self._display_path,
4✔
360
                                   args=self.args,
361
                                   repository_credentials_provider=self._repository_credentials_provider,
362
                                   walk_remote=self.walk_remote,
363
                                   walk_local=self.walk_local,
364
                                   register_plugin=self.register_plugin,
365
                                   assert_plugin=self.assert_plugin,
366
                                   config_as_dict=config_as_dict,
367
                                   config_parent=config_parent,
368
                                   download_remote_file=download_remote_file,
369
                                   load_remote_file=load_remote_file,
370
                                   register_cleanup=self.register_cleanup,
371
                                   jp=jp,
372
                                   run=self._run,
373
                                   run_capturing_out=self._run_capturing_out,
374
                                   run_passthrough_capturing=self._run_passthrough_capturing,
375
                                   repository=self.repository,
376
                                   StripNL=StripNL,
377
                                   default_includes=Globs(["*"], True),
378
                                   default_excludes=Globs([".*"], True),
379
                                   )
380
        context.app = dict(_repository_credentials_provider=None,
4✔
381
                           default_includes=Globs(context.app.default_includes),
382
                           default_excludes=Globs(context.app.default_excludes),
383
                           includes=Globs(context.app.default_includes),
384
                           excludes=Globs(context.app.default_excludes),
385
                           )
386

387
    def handle_before_dir(self, cwd: Path):
4✔
388
        context = self.context
4✔
389
        app = context.app
4✔
390
        app.includes = Globs(app.default_includes)
4✔
391
        app.excludes = Globs(app.default_excludes)
4✔
392
        app.cwd = cwd
4✔
393
        self._new_paths = []
4✔
394

395
    def handle_before_script(self, cwd: Path):
4✔
396
        context = self.context
4✔
397
        app = context.app
4✔
398
        app.script = (cwd / ".kubernator.py")
4✔
399

400
    def handle_after_script(self, cwd: Path):
4✔
401
        context = self.context
4✔
402
        app = context.app
4✔
403
        del app.script
4✔
404

405
    def handle_after_dir(self, cwd: Path):
4✔
406
        context = self.context
4✔
407
        app = context.app
4✔
408

409
        for f in scan_dir(logger, cwd, lambda d: d.is_dir(), app.excludes, app.includes):
4✔
410
            self._new_paths.append((PropertyDict(_parent=context), f))
4✔
411

412
        self.path_q.extend(reversed(self._new_paths))
4✔
413

414
        del app.cwd
4✔
415

416
    def repository(self, repo):
4✔
417
        repository = Repository(repo, self._repo_cred_augmentation)
4✔
418
        if repository in self.repos:
4!
419
            repository = self.repos[repository]
×
420
        else:
421
            self.repos[repository] = repository
4✔
422
            repository.init(logger, self.context)
4✔
423
            self.register_cleanup(repository)
4✔
424

425
        return repository
4✔
426

427
    def walk_local(self, *paths: Union[Path, str, bytes], keep_context=False):
4✔
428
        for path in paths:
×
429
            p = Path(path)
×
430
            if not p.is_absolute():
×
431
                p = self.context.app.cwd / p
×
432
            self._add_local(p, keep_context)
×
433

434
    def walk_remote(self, repo, *path_prefixes: Union[Path, str, bytes], keep_context=False):
4✔
435
        repository = self.repository(repo)
4✔
436

437
        if path_prefixes:
4!
438
            for path_prefix in path_prefixes:
×
439
                path = Path(path_prefix)
×
440
                if path.is_absolute():
×
441
                    path = Path(*path.parts[1:])
×
442
                self._add_local(repository.local_dir / path, keep_context)
×
443
        else:
444
            self._add_local(repository.local_dir, keep_context)
4✔
445

446
    def set_context(self, context):
4✔
447
        # We are managing the context for everyone so we don't actually set it anywhere
448
        pass
4✔
449

450
    def _add_local(self, path: Path, keep_context=False):
4✔
451
        logger.info("Adding %s to the plan%s", self._display_path(path),
4✔
452
                    " (in parent context)" if keep_context else "")
453
        self._new_paths.append((self.context if keep_context else PropertyDict(_parent=self.context), path))
4✔
454

455
    def _repository_credentials_provider(self,
4✔
456
                                         provider: Optional[
457
                                             Callable[[urllib.parse.SplitResult], tuple[
458
                                                 Optional[str], Optional[str], Optional[str]]]]):
459
        self.context.app._repository_credentials_provider = provider
×
460

461
    def _repo_cred_augmentation(self, url):
4✔
462
        rcp = self.context.app._repository_credentials_provider
4✔
463
        if not rcp:
4!
464
            return url
4✔
465

466
        scheme, username, password = rcp(url)
×
467
        return urllib.parse.SplitResult(scheme if scheme else url.scheme,
×
468
                                        ((
469
                                             username +
470
                                             (
471
                                                 ":" + password if password else "") + "@"
472
                                             if username else "") + url.hostname)
473
                                        if username or password
474
                                        else url.netloc,
475
                                        url.path, url.query, url.fragment)
476

477
    def _path_to_repository(self, path: Path) -> Repository:
4✔
478
        for r in self.repos.values():
4✔
479
            if path.is_relative_to(r.local_dir):
4!
480
                return r
4✔
481

482
    def _display_path(self, path: Path) -> str:
4✔
483
        repo = self._path_to_repository(path)
4✔
484
        return "<%s> %s" % (repo.url_str, path) if repo else str(path)
4✔
485

486
    def _run(self, *args, **kwargs):
4✔
487
        return run(*args, **kwargs)
4✔
488

489
    def _run_capturing_out(self, *args, **kwargs):
4✔
490
        return run_capturing_out(*args, **kwargs)
4✔
491

492
    def _run_passthrough_capturing(self, *args, **kwargs):
4✔
493
        return run_pass_through_capturing(*args, **kwargs)
4✔
494

495
    def __repr__(self):
4✔
496
        return "Kubernator"
4✔
497

498

499
def clear_cache():
4✔
500
    cache_dir = get_app_cache_dir()
×
501
    _clear_cache("Clearing application cache at %s", cache_dir)
×
502

503

504
def clear_k8s_cache():
4✔
505
    cache_dir = get_cache_dir("python")
4✔
506
    _clear_cache("Clearing Kubernetes Client cache at %s", cache_dir)
4✔
507

508

509
def _clear_cache(msg, cache_dir):
4✔
510
    logger.info(msg, cache_dir)
4✔
511
    if cache_dir.exists():
4!
512
        rmtree(cache_dir)
4✔
513

514

515
def pre_cache_k8s_clients(*versions, disable_patching=False):
4✔
516
    proc_logger = logger.getChild("proc")
4✔
517
    stdout_logger = StripNL(proc_logger.info)
4✔
518
    stderr_logger = StripNL(proc_logger.warning)
4✔
519

520
    for v in versions:
4✔
521
        logger.info("Caching K8S client library ~=v%s.0%s...", v,
4✔
522
                    " (no patches)" if disable_patching else "")
523
        install_python_k8s_client(run_pass_through_capturing, v, logger, stdout_logger, stderr_logger, disable_patching)
4✔
524

525

526
def main():
4✔
527
    argparser = define_arg_parse()
4✔
528
    args = argparser.parse_args()
4✔
529
    if not args.pre_cache_k8s_client and args.pre_cache_k8s_client_no_patch is not None:
4!
530
        argparser.error("--pre-cache-k8s-client-no-patch can only be used with --pre-cache-k8s-client")
×
531

532
    init_logging(args.verbose, args.log_format, args.log_file)
4✔
533

534
    try:
4✔
535
        if args.clear_cache:
4!
536
            clear_cache()
×
537
            return
×
538

539
        if args.clear_k8s_cache:
4✔
540
            clear_k8s_cache()
4✔
541
            return
4✔
542

543
        if args.pre_cache_k8s_client:
4✔
544
            pre_cache_k8s_clients(*args.pre_cache_k8s_client,
4✔
545
                                  disable_patching=args.pre_cache_k8s_client_no_patch)
546
            return
4✔
547

548
        with App(args) as app:
4✔
549
            app.run()
4✔
550
    except SystemExit as e:
4!
551
        return e.code
×
552
    except Exception as e:
4✔
553
        logger.fatal("Kubernator terminated with an error: %s", e, exc_info=e)
4✔
554
        return 1
4✔
555
    else:
556
        logger.info("Kubernator terminated successfully")
4✔
557
    finally:
558
        logging.shutdown()
4!
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc