• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 19253952588

11 Nov 2025 03:28AM UTC coverage: 77.467% (+2.1%) from 75.318%
19253952588

push

github

web-flow
Merge pull request #92 from karellen/add_assert_plugin

Add assert_plugin to augment register_plugin to indicate dependency

654 of 1001 branches covered (65.33%)

Branch coverage included in aggregate %.

10 of 17 new or added lines in 3 files covered. (58.82%)

13 existing lines in 2 files now uncovered.

2526 of 3104 relevant lines covered (81.38%)

4.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.84
/src/main/python/kubernator/app.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import argparse
6✔
20
import datetime
6✔
21
import importlib
6✔
22
import logging
6✔
23
import pkgutil
6✔
24
import sys
6✔
25
import urllib.parse
6✔
26
from collections import deque
6✔
27
from collections.abc import MutableMapping, Callable
6✔
28
from pathlib import Path
6✔
29
from shutil import rmtree
6✔
30
from typing import Optional, Union
6✔
31

32
import yaml
6✔
33

34
import kubernator
6✔
35
from kubernator.api import (KubernatorPlugin, Globs, scan_dir, PropertyDict, config_as_dict, config_parent,
6✔
36
                            download_remote_file, load_remote_file, Repository, StripNL, jp, get_app_cache_dir,
37
                            get_cache_dir, install_python_k8s_client)
38
from kubernator.proc import run, run_capturing_out, run_pass_through_capturing
6✔
39

40
TRACE = 5
6✔
41

42

43
def trace(self, msg, *args, **kwargs):
6✔
44
    """
45
    Log 'msg % args' with severity 'TRACE'.
46

47
    To pass exception information, use the keyword argument exc_info with
48
    a true value, e.g.
49

50
    logger.trace("Houston, we have a %s", "interesting problem", exc_info=1)
51
    """
52
    if self.isEnabledFor(TRACE):
6✔
53
        self._log(TRACE, msg, args, **kwargs)
6✔
54

55

56
logging.addLevelName(5, "TRACE")
6✔
57
logging.Logger.trace = trace
6✔
58
logger = logging.getLogger("kubernator")
6✔
59

60
try:
6✔
61
    del (yaml.resolver.Resolver.yaml_implicit_resolvers["="])
6✔
62
except KeyError:
6✔
63
    pass
6✔
64

65

66
def define_arg_parse():
6✔
67
    parser = argparse.ArgumentParser(prog="kubernator",
6✔
68
                                     description="Kubernetes Provisioning Tool",
69
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
70
    g = parser.add_mutually_exclusive_group()
6✔
71
    g.add_argument("--version", action="version", version=kubernator.__version__,
6✔
72
                   help="print version and exit")
73
    g.add_argument("--clear-cache", action="store_true",
6✔
74
                   help="clear cache and exit")
75
    g.add_argument("--clear-k8s-cache", action="store_true",
6✔
76
                   help="clear Kubernetes Client cache and exit")
77
    g.add_argument("--pre-cache-k8s-client", action="extend", nargs="+", type=int,
6✔
78
                   help="download specified K8S client library major(!) version(s) and exit")
79
    parser.add_argument("--pre-cache-k8s-client-no-patch", action="store_true", default=None,
6✔
80
                        help="do not patch the k8s client being pre-cached")
81
    parser.add_argument("--log-format", choices=["human", "json"], default="human",
6✔
82
                        help="whether to log for human or machine consumption")
83
    parser.add_argument("--log-file", type=argparse.FileType("w"), default=None,
6✔
84
                        help="where to log, defaults to `stderr`")
85
    parser.add_argument("-v", "--verbose", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"],
6✔
86
                        default="INFO", help="how verbose do you want Kubernator to be")
87
    parser.add_argument("-f", "--file", type=argparse.FileType("w"), default=sys.stdout,
6✔
88
                        help="where to generate results, if necessary")
89
    parser.add_argument("-o", "--output-format", choices=["json", "json-pretty", "yaml"], default="yaml",
6✔
90
                        help="in what format to generate results")
91
    parser.add_argument("-p", "--path", dest="path", default=".",
6✔
92
                        type=Path, help="path to start processing")
93
    #    parser.add_argument("--pre-start-script", default=None, type=Path,
94
    #                        help="location of the pre-start script")
95
    #    parser.add_argument("--disconnected", action="store_true", default=False,
96
    #                        help="do not actually connect to the target Kubernetes")
97
    #    parser.add_argument("--k8s-version", type=str, default=None,
98
    #                        help="specify a version of Kubernetes when operating in the disconnected mode")
99
    parser.add_argument("--yes", action="store_false", default=True, dest="dry_run",
6✔
100
                        help="actually make destructive changes")
101
    parser.add_argument("command", nargs="?", choices=["dump", "apply"], default="dump",
6✔
102
                        help="whether to dump the proposed changes to the output or to apply them")
103
    return parser
6✔
104

105

106
def init_logging(verbose, output_format, output_file):
6✔
107
    root_log = logging.root
6✔
108

109
    handler = logging.StreamHandler(output_file)
6✔
110
    root_log.addHandler(handler)
6✔
111

112
    if output_format == "human":
6✔
113
        if handler.stream.isatty():
6!
114
            import coloredlogs
×
115
            fmt_cls = coloredlogs.ColoredFormatter
×
116

117
        else:
118
            fmt_cls = logging.Formatter
6✔
119

120
        def formatTime(record, datefmt=None):
6✔
121
            return datetime.datetime.fromtimestamp(record.created).isoformat()
6✔
122

123
        formatter = fmt_cls("%(asctime)s %(name)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
6✔
124
        formatter.formatTime = formatTime
6✔
125
    else:
126
        import json_log_formatter
6✔
127

128
        class JSONFormatter(json_log_formatter.JSONFormatter):
6✔
129
            def json_record(self, message, extra, record: logging.LogRecord):
6✔
130
                extra = super(JSONFormatter, self).json_record(message, extra, record)
6✔
131
                extra["ts"] = datetime.datetime.fromtimestamp(record.created)
6✔
132
                extra["name"] = record.name
6✔
133
                extra["level"] = record.levelname
6✔
134
                extra["fn"] = record.filename
6✔
135
                extra["ln"] = record.lineno
6✔
136
                del extra["time"]
6✔
137
                return extra
6✔
138

139
        formatter = JSONFormatter()
6✔
140

141
    handler.setFormatter(formatter)
6✔
142
    logger.setLevel(logging._nameToLevel[verbose])
6✔
143

144

145
class App(KubernatorPlugin):
6✔
146
    _name = "app"
6✔
147

148
    def __init__(self, args):
6✔
149
        self.args = args
6✔
150
        path = args.path.absolute()
6✔
151

152
        global_context = PropertyDict()
6✔
153
        global_context.globals = global_context
6✔
154
        context = PropertyDict(_parent=global_context)
6✔
155
        context._plugins = []
6✔
156

157
        self._top_level_context = context
6✔
158
        self.context = context
6✔
159
        self._top_dir_context = PropertyDict(_parent=self.context)
6✔
160

161
        self.repos: MutableMapping[Repository, Repository] = dict()
6✔
162
        self.path_q: deque[tuple[PropertyDict, Path]] = deque(((self._top_dir_context, path),))
6✔
163

164
        self._new_paths: list[tuple[PropertyDict, Path]] = []
6✔
165

166
        self._cleanups = []
6✔
167
        self._plugin_types = {}
6✔
168

169
    def __enter__(self):
6✔
170
        return self
6✔
171

172
    def __exit__(self, exc_type, exc_val, exc_tb):
6✔
173
        self.cleanup()
6✔
174

175
    def run(self):
6✔
176
        logger.info("Starting Kubernator version %s", kubernator.__version__)
6✔
177

178
        self.register_plugin(self)
6✔
179

180
        try:
6✔
181
            try:
6✔
182
                while True:
5✔
183
                    cwd = self.next()
6✔
184
                    if not cwd:
6✔
185
                        logger.debug("No paths left to traverse")
6✔
186
                        break
6✔
187

188
                    context = self.context
6✔
189

190
                    logger.debug("Inspecting directory %s", self._display_path(cwd))
6✔
191
                    self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, None, cwd)
6✔
192

193
                    if (ktor_py := (cwd / ".kubernator.py")).exists():
6✔
194
                        self._run_handlers(KubernatorPlugin.handle_before_script, False, context, None, cwd)
6✔
195

196
                        for h in self.context._plugins:
6✔
197
                            h.set_context(context)
6✔
198

199
                        self._exec_ktor(ktor_py)
6✔
200

201
                        for h in self.context._plugins:
6✔
202
                            h.set_context(None)
6✔
203

204
                        self._run_handlers(KubernatorPlugin.handle_after_script, True, context, None, cwd)
6✔
205

206
                    self._run_handlers(KubernatorPlugin.handle_after_dir, True, context, None, cwd)
6✔
207

208
                self.context = self._top_dir_context
6✔
209
                context = self.context
6✔
210

211
                self._run_handlers(KubernatorPlugin.handle_apply, True, context, None)
6✔
212

213
                self._run_handlers(KubernatorPlugin.handle_verify, True, context, None)
6✔
214
            finally:
215
                self.context = self._top_dir_context
6✔
216
                context = self.context
6✔
217
                self._run_handlers(KubernatorPlugin.handle_shutdown, True, context, None)
6✔
218
        except:  # noqa E722
6✔
219
            raise
6✔
220
        else:
221
            self.context = self._top_dir_context
6✔
222
            context = self.context
6✔
223
            self._run_handlers(KubernatorPlugin.handle_summary, True, context, None)
6✔
224

225
    def discover_plugins(self):
6✔
226
        importlib.invalidate_caches()
6✔
227
        search_path = Path(kubernator.__path__[0], "plugins")
6✔
228
        [importlib.import_module(name)
6✔
229
         for finder, name, is_pkg in
230
         pkgutil.iter_modules([str(search_path)], "kubernator.plugins.")]
231

232
        for plugin in KubernatorPlugin.__subclasses__():
6✔
233
            if plugin._name in self._plugin_types:
6!
234
                logger.warning("Plugin named %r in %r is already reserved by %r and will be ignored",
×
235
                               plugin._name,
236
                               plugin,
237
                               self._plugin_types[plugin._name])
238
            else:
239
                logger.info("Plugin %r discovered in %r", plugin._name, plugin)
6✔
240
                self._plugin_types[plugin._name] = plugin
6✔
241

242
    def assert_plugin(self, plugin: Union[KubernatorPlugin, type[KubernatorPlugin], str],
6✔
243
                      requester: Union[KubernatorPlugin, type[KubernatorPlugin], str]):
244
        context = self.context
6✔
245
        if isinstance(plugin, str):
6!
246
            try:
6✔
247
                plugin_type = self._plugin_types[plugin]
6✔
NEW
248
            except KeyError:
×
NEW
249
                logger.critical("No known plugin with the name %r", plugin)
×
NEW
250
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
NEW
251
        elif isinstance(plugin, type):
×
NEW
252
            plugin_type = plugin
×
253
        else:
NEW
254
            plugin_type = type(plugin)
×
255

256
        for p in context._plugins:
6!
257
            if p._name == plugin_type._name:
6✔
258
                return
6✔
259

NEW
260
        raise RuntimeError("Plugin %s requires plugin %s to be initialized",
×
261
                           requester if hasattr(requester, "_name") else requester,
262
                           plugin)
263

264
    def register_plugin(self, plugin: Union[KubernatorPlugin, type, str], **kwargs):
6✔
265
        context = self.context
6✔
266
        if isinstance(plugin, str):
6✔
267
            try:
6✔
268
                plugin_obj = self._plugin_types[plugin]()
6✔
269
            except KeyError:
×
270
                logger.critical("No known plugin with the name %r", plugin)
×
271
                raise RuntimeError("No known plugin with the name %r" % (plugin,))
×
272
        elif isinstance(plugin, type):
6!
273
            plugin_obj = plugin()
×
274
        else:
275
            plugin_obj = plugin
6✔
276

277
        for p in context._plugins:
6✔
278
            if p._name == plugin_obj._name:
6✔
279
                logger.info("Plugin with name %r already registered, skipping", p._name)
6✔
280
                return
6✔
281

282
        logger.info("Registering plugin %r via %r", plugin_obj._name, plugin_obj)
6✔
283

284
        # Register
285
        self._run_handlers(KubernatorPlugin.register, False, context, plugin_obj, **kwargs)
6✔
286

287
        context._plugins.append(plugin_obj)
6✔
288

289
        # Init
290
        self._run_handlers(KubernatorPlugin.handle_init, False, context, plugin_obj)
6✔
291

292
        # Start
293
        self._run_handlers(KubernatorPlugin.handle_start, False, context, plugin_obj)
6✔
294

295
        # If we're already processing a directory
296
        if "app" in self.context and "cwd" in self.context.app and self.context.app.cwd:
6✔
297
            cwd = self.context.app.cwd
6✔
298
            self._run_handlers(KubernatorPlugin.handle_before_dir, False, context, plugin_obj, cwd)
6✔
299

300
            # If we're already in the script (TODO: is it possible to NOT be in a script?)
301
            if "script" in self.context.app:
6!
302
                self._run_handlers(KubernatorPlugin.handle_before_script, False, context, plugin_obj, cwd)
6✔
303

304
    def _run_handlers(self, __f, __reverse, __context, __plugin, *args, **kwargs):
6✔
305
        f_name = __f.__name__
6✔
306

307
        def run(h):
6✔
308
            h_f = getattr(h, f_name, None)
6✔
309
            if h_f:
6!
310
                logger.trace("Running %r handler on %r with %r, %r", f_name, h, args, kwargs)
6✔
311
                h_f(*args, **kwargs)
6✔
312

313
        if __plugin:
6✔
314
            __plugin.set_context(__context)
6✔
315
            run(__plugin)
6✔
316
        else:
317
            self._set_plugin_context(__reverse, __context, run)
6✔
318

319
    def _set_plugin_context(self, reverse, context, run):
6✔
320
        for h in list(self.context._plugins if not reverse else reversed(self.context._plugins)):
6✔
321
            h.set_context(context)
6✔
322
            run(h)
6✔
323
            h.set_context(None)
6✔
324

325
    def _exec_ktor(self, ktor_py: Path):
6✔
326
        ktor_py_display_path = self._display_path(ktor_py)
6✔
327
        logger.debug("Executing %s", ktor_py_display_path)
6✔
328
        with open(ktor_py, "rb") as f:
6✔
329
            source = f.read()
6✔
330
        co = compile(source, ktor_py_display_path, "exec")
6✔
331
        globs = {"ktor": self.context,
6✔
332
                 "logger": logger.getChild("script")
333
                 }
334
        exec(co, globs)
6✔
335
        logger.debug("Executed %r", ktor_py_display_path)
6✔
336

337
    def next(self) -> Path:
6✔
338
        path_queue: deque[tuple[PropertyDict, Path]] = self.path_q
6✔
339
        if path_queue:
6✔
340
            self.context, path = path_queue.pop()
6✔
341
            return path
6✔
342

343
    def register_cleanup(self, h):
6✔
344
        if not hasattr(h, "cleanup"):
6!
345
            raise RuntimeError("cleanup handler has no cleanup attribute")
×
346
        self._cleanups.append(h)
6✔
347

348
    def cleanup(self):
6✔
349
        for h in self._cleanups:
6✔
350
            h.cleanup()
6✔
351

352
    def register(self, **kwargs):
6✔
353
        self.discover_plugins()
6✔
354

355
    def handle_init(self):
6✔
356
        context = self.context
6✔
357

358
        context.globals.common = dict()
6✔
359
        context.globals.app = dict(display_path=self._display_path,
6✔
360
                                   args=self.args,
361
                                   repository_credentials_provider=self._repository_credentials_provider,
362
                                   walk_remote=self.walk_remote,
363
                                   walk_local=self.walk_local,
364
                                   register_plugin=self.register_plugin,
365
                                   assert_plugin=self.assert_plugin,
366
                                   config_as_dict=config_as_dict,
367
                                   config_parent=config_parent,
368
                                   download_remote_file=download_remote_file,
369
                                   load_remote_file=load_remote_file,
370
                                   register_cleanup=self.register_cleanup,
371
                                   jp=jp,
372
                                   run=self._run,
373
                                   run_capturing_out=self._run_capturing_out,
374
                                   run_passthrough_capturing=self._run_passthrough_capturing,
375
                                   repository=self.repository,
376
                                   StripNL=StripNL,
377
                                   default_includes=Globs(["*"], True),
378
                                   default_excludes=Globs([".*"], True),
379
                                   )
380
        context.app = dict(_repository_credentials_provider=None,
6✔
381
                           default_includes=Globs(context.app.default_includes),
382
                           default_excludes=Globs(context.app.default_excludes),
383
                           includes=Globs(context.app.default_includes),
384
                           excludes=Globs(context.app.default_excludes),
385
                           )
386

387
    def handle_before_dir(self, cwd: Path):
6✔
388
        context = self.context
6✔
389
        app = context.app
6✔
390
        app.includes = Globs(app.default_includes)
6✔
391
        app.excludes = Globs(app.default_excludes)
6✔
392
        app.cwd = cwd
6✔
393
        self._new_paths = []
6✔
394

395
    def handle_before_script(self, cwd: Path):
6✔
396
        context = self.context
6✔
397
        app = context.app
6✔
398
        app.script = (cwd / ".kubernator.py")
6✔
399

400
    def handle_after_script(self, cwd: Path):
6✔
401
        context = self.context
6✔
402
        app = context.app
6✔
403
        del app.script
6✔
404

405
    def handle_after_dir(self, cwd: Path):
6✔
406
        context = self.context
6✔
407
        app = context.app
6✔
408

409
        for f in scan_dir(logger, cwd, lambda d: d.is_dir(), app.excludes, app.includes):
6✔
410
            self._new_paths.append((PropertyDict(_parent=context), f))
6✔
411

412
        self.path_q.extend(reversed(self._new_paths))
6✔
413

414
        del app.cwd
6✔
415

416
    def repository(self, repo):
6✔
417
        repository = Repository(repo, self._repo_cred_augmentation)
6✔
418
        if repository in self.repos:
6!
419
            repository = self.repos[repository]
×
420
        else:
421
            self.repos[repository] = repository
6✔
422
            repository.init(logger, self.context)
6✔
423
            self.register_cleanup(repository)
6✔
424

425
        return repository
6✔
426

427
    def walk_local(self, *paths: Union[Path, str, bytes], keep_context=False):
6✔
428
        for path in paths:
×
429
            p = Path(path)
×
430
            if not p.is_absolute():
×
431
                p = self.context.app.cwd / p
×
432
            self._add_local(p, keep_context)
×
433

434
    def walk_remote(self, repo, *path_prefixes: Union[Path, str, bytes], keep_context=False):
6✔
435
        repository = self.repository(repo)
6✔
436

437
        if path_prefixes:
6!
438
            for path_prefix in path_prefixes:
×
439
                path = Path(path_prefix)
×
440
                if path.is_absolute():
×
441
                    path = Path(*path.parts[1:])
×
442
                self._add_local(repository.local_dir / path, keep_context)
×
443
        else:
444
            self._add_local(repository.local_dir, keep_context)
6✔
445

446
    def set_context(self, context):
6✔
447
        # We are managing the context for everyone so we don't actually set it anywhere
448
        pass
6✔
449

450
    def _add_local(self, path: Path, keep_context=False):
6✔
451
        logger.info("Adding %s to the plan%s", self._display_path(path),
6✔
452
                    " (in parent context)" if keep_context else "")
453
        self._new_paths.append((self.context if keep_context else PropertyDict(_parent=self.context), path))
6✔
454

455
    def _repository_credentials_provider(self,
6✔
456
                                         provider: Optional[
457
                                             Callable[[urllib.parse.SplitResult], tuple[
458
                                                 Optional[str], Optional[str], Optional[str]]]]):
459
        self.context.app._repository_credentials_provider = provider
×
460

461
    def _repo_cred_augmentation(self, url):
6✔
462
        rcp = self.context.app._repository_credentials_provider
6✔
463
        if not rcp:
6!
464
            return url
6✔
465

466
        scheme, username, password = rcp(url)
×
467
        return urllib.parse.SplitResult(scheme if scheme else url.scheme,
×
468
                                        ((
469
                                             username +
470
                                             (
471
                                                 ":" + password if password else "") + "@"
472
                                             if username else "") + url.hostname)
473
                                        if username or password
474
                                        else url.netloc,
475
                                        url.path, url.query, url.fragment)
476

477
    def _path_to_repository(self, path: Path) -> Repository:
6✔
478
        for r in self.repos.values():
6✔
479
            if path.is_relative_to(r.local_dir):
6!
480
                return r
6✔
481

482
    def _display_path(self, path: Path) -> str:
6✔
483
        repo = self._path_to_repository(path)
6✔
484
        return "<%s> %s" % (repo.url_str, path) if repo else str(path)
6✔
485

486
    def _run(self, *args, **kwargs):
6✔
487
        return run(*args, **kwargs)
6✔
488

489
    def _run_capturing_out(self, *args, **kwargs):
6✔
490
        return run_capturing_out(*args, **kwargs)
6✔
491

492
    def _run_passthrough_capturing(self, *args, **kwargs):
6✔
493
        return run_pass_through_capturing(*args, **kwargs)
6✔
494

495
    def __repr__(self):
6✔
496
        return "Kubernator"
6✔
497

498

499
def clear_cache():
6✔
500
    cache_dir = get_app_cache_dir()
×
501
    _clear_cache("Clearing application cache at %s", cache_dir)
×
502

503

504
def clear_k8s_cache():
6✔
505
    cache_dir = get_cache_dir("python")
6✔
506
    _clear_cache("Clearing Kubernetes Client cache at %s", cache_dir)
6✔
507

508

509
def _clear_cache(msg, cache_dir):
6✔
510
    logger.info(msg, cache_dir)
6✔
511
    if cache_dir.exists():
6!
512
        rmtree(cache_dir)
6✔
513

514

515
def pre_cache_k8s_clients(*versions, disable_patching=False):
6✔
516
    proc_logger = logger.getChild("proc")
6✔
517
    stdout_logger = StripNL(proc_logger.info)
6✔
518
    stderr_logger = StripNL(proc_logger.warning)
6✔
519

520
    for v in versions:
6✔
521
        logger.info("Caching K8S client library ~=v%s.0%s...", v,
6✔
522
                    " (no patches)" if disable_patching else "")
523
        install_python_k8s_client(run_pass_through_capturing, v, logger, stdout_logger, stderr_logger, disable_patching)
6✔
524

525

526
def main():
6✔
527
    argparser = define_arg_parse()
6✔
528
    args = argparser.parse_args()
6✔
529
    if not args.pre_cache_k8s_client and args.pre_cache_k8s_client_no_patch is not None:
6!
530
        argparser.error("--pre-cache-k8s-client-no-patch can only be used with --pre-cache-k8s-client")
×
531

532
    init_logging(args.verbose, args.log_format, args.log_file)
6✔
533

534
    try:
6✔
535
        if args.clear_cache:
6!
536
            clear_cache()
×
537
            return
×
538

539
        if args.clear_k8s_cache:
6✔
540
            clear_k8s_cache()
6✔
541
            return
6✔
542

543
        if args.pre_cache_k8s_client:
6✔
544
            pre_cache_k8s_clients(*args.pre_cache_k8s_client,
6✔
545
                                  disable_patching=args.pre_cache_k8s_client_no_patch)
546
            return
6✔
547

548
        with App(args) as app:
6✔
549
            app.run()
6✔
550
    except SystemExit as e:
6!
551
        return e.code
×
552
    except Exception as e:
6✔
553
        logger.fatal("Kubernator terminated with an error: %s", e, exc_info=e)
6✔
554
        return 1
6✔
555
    else:
556
        logger.info("Kubernator terminated successfully")
6✔
557
    finally:
558
        logging.shutdown()
6!
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc