• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 18360533659

08 Oct 2025 11:15PM UTC coverage: 75.441% (+0.1%) from 75.334%
18360533659

push

github

web-flow
Merge pull request #82 from karellen/minikube_storage_addons

Minikube storage addons initialization

627 of 982 branches covered (63.85%)

Branch coverage included in aggregate %.

30 of 33 new or added lines in 2 files covered. (90.91%)

2 existing lines in 1 file now uncovered.

2454 of 3102 relevant lines covered (79.11%)

4.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.49
/src/main/python/kubernator/plugins/kubectl.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2023 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import io
6✔
20
import json
6✔
21
import logging
6✔
22
import os
6✔
23
import tempfile
6✔
24
from pathlib import Path
6✔
25
from shutil import which, copy
6✔
26

27
import yaml
6✔
28

29
from kubernator.api import (KubernatorPlugin,
6✔
30
                            prepend_os_path,
31
                            StripNL,
32
                            get_golang_os,
33
                            get_golang_machine
34
                            )
35

36
logger = logging.getLogger("kubernator.kubectl")
6✔
37
proc_logger = logger.getChild("proc")
6✔
38
stdout_logger = StripNL(proc_logger.info)
6✔
39
stderr_logger = StripNL(proc_logger.warning)
6✔
40

41

42
class KubectlPlugin(KubernatorPlugin):
6✔
43
    logger = logger
6✔
44

45
    _name = "kubectl"
6✔
46

47
    def __init__(self):
6✔
48
        self.context = None
6✔
49
        self.kubectl_dir = None
6✔
50
        super().__init__()
6✔
51

52
    def set_context(self, context):
6✔
53
        self.context = context
6✔
54

55
    def stanza(self):
6✔
56
        context = self.context.kubectl
6✔
57
        return [context.kubectl_file, f"--kubeconfig={self.context.kubeconfig.kubeconfig}"]
6✔
58

59
    def register(self,
6✔
60
                 version=None,
61
                 kubeconfig=None):
62
        context = self.context
6✔
63

64
        context.app.register_plugin("kubeconfig")
6✔
65

66
        if not version and "k8s" in context:
6!
UNCOV
67
            version = ".".join(context.k8s.server_version)
×
UNCOV
68
            logger.info("Kubernetes plugin is running with server version %s - will use it", version)
×
69

70
        if version:
6!
71
            # Download and use specific version
72
            kubectl_url = f"https://dl.k8s.io/release/v{version}/bin/" \
6✔
73
                          f"{get_golang_os()}/" \
74
                          f"{get_golang_machine()}/kubectl"
75
            kubectl_file_dl, _ = context.app.download_remote_file(logger, kubectl_url, "bin")
6✔
76
            kubectl_file_dl = str(kubectl_file_dl)
6✔
77
            self.kubectl_dir = tempfile.TemporaryDirectory()
6✔
78
            context.app.register_cleanup(self.kubectl_dir)
6✔
79

80
            kubectl_file = str(Path(self.kubectl_dir.name) / "kubectl")
6✔
81
            copy(kubectl_file_dl, kubectl_file)
6✔
82
            os.chmod(kubectl_file, 0o500)
6✔
83
            prepend_os_path(self.kubectl_dir.name)
6✔
84
        else:
85
            # Use current version
86
            kubectl_file = which("kubectl")
×
87
            if not kubectl_file:
×
88
                raise RuntimeError("`kubectl` cannot be found and no version has been specified")
×
89

90
            logger.debug("Found kubectl in %r", kubectl_file)
×
91

92
        context.globals.kubectl = dict(version=version,
6✔
93
                                       kubectl_file=kubectl_file,
94
                                       stanza=self.stanza,
95
                                       test=self.test_kubectl,
96
                                       run=self.run,
97
                                       run_capturing=self.run_capturing,
98
                                       get=self.get,
99
                                       )
100

101
        context.globals.kubectl.version = context.kubectl.test()
6✔
102

103
    def run_capturing(self, *args, **kwargs):
6✔
104
        return self.context.app.run_capturing_out(self.stanza() +
6✔
105
                                                  list(args),
106
                                                  stderr_logger,
107
                                                  **kwargs
108
                                                  )
109

110
    def run(self, *args, **kwargs):
6✔
111
        self.context.app.run(self.stanza() +
6✔
112
                             list(args),
113
                             stdout_logger,
114
                             stderr_logger,
115
                             **kwargs
116
                             ).wait()
117

118
    def get(self, resource_type, resource_name, namespace=None):
6✔
119
        args = ["get", resource_type, resource_name]
6✔
120
        if namespace:
6!
NEW
121
            args += ["-n", namespace]
×
122
        args += ["-o", "yaml"]
6✔
123

124
        res = list(yaml.safe_load_all(io.StringIO(self.context.kubectl.run_capturing(*args))))
6✔
125
        if len(res):
6!
126
            if len(res) > 1:
6!
NEW
127
                return res
×
128
            return res[0]
6✔
NEW
129
        return None
×
130

131
    def test_kubectl(self):
6✔
132
        version_out: str = self.run_capturing("version", "--client=true", "-o", "json")
6✔
133

134
        version_out_js = json.loads(version_out)
6✔
135
        kubectl_version = version_out_js["clientVersion"]["gitVersion"][1:]
6✔
136

137
        logger.info("Using kubectl %r version %r with stanza %r",
6✔
138
                    self.context.kubectl.kubectl_file, kubectl_version, self.stanza())
139

140
        return kubectl_version
6✔
141

142
    def __repr__(self):
6✔
143
        return "Kubectl Plugin"
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc