• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 19199722726

08 Nov 2025 10:51PM UTC coverage: 75.318% (+0.3%) from 75.065%
19199722726

Pull #91

github

web-flow
Merge a29e53ff5 into b66cae610
Pull Request #91: Add Helm ability to check latest versions of the charts used

650 of 1022 branches covered (63.6%)

Branch coverage included in aggregate %.

25 of 25 new or added lines in 1 file covered. (100.0%)

5 existing lines in 3 files now uncovered.

2545 of 3220 relevant lines covered (79.04%)

4.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.5
/src/main/python/kubernator/plugins/kops.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import json
6✔
20
import logging
6✔
21
import os
6✔
22
from pathlib import Path
6✔
23
from tempfile import TemporaryDirectory
6✔
24

25
import yaml
6✔
26

27
from kubernator.api import (KubernatorPlugin, scan_dir,
6✔
28
                            FileType,
29
                            load_remote_file,
30
                            io_StringIO,
31
                            TemplateEngine,
32
                            StripNL,
33
                            Globs, load_file)
34
from kubernator.plugins.k8s_api import K8SResourcePluginMixin
6✔
35
from kubernator.proc import CalledProcessError
6✔
36

37
logger = logging.getLogger("kubernator.kops")
6✔
38
proc_logger = logger.getChild("kops")
6✔
39
stdout_logger = StripNL(proc_logger.info)
6✔
40
stderr_logger = StripNL(proc_logger.warning)
6✔
41

42
KOPS_CRDS = ["kops.k8s.io_clusters.yaml",
6✔
43
             "kops.k8s.io_instancegroups.yaml",
44
             "kops.k8s.io_keysets.yaml",
45
             "kops.k8s.io_sshcredentials.yaml"]
46

47
OBJECT_SCHEMA_VERSION = "1.20.6"
6✔
48

49

50
class KopsPlugin(KubernatorPlugin, K8SResourcePluginMixin):
6✔
51
    logger = logger
6✔
52

53
    _name = "kops"
6✔
54

55
    def __init__(self):
6✔
56
        self.context = None
×
57
        self.kops_stanza = ["kops"]
×
58
        self.version = None
×
59
        self.kops_dir = None
×
60
        self.kops_path = None
×
61

62
        self.template_engine = TemplateEngine(logger)
×
63
        super().__init__()
×
64

65
    def set_context(self, context):
6✔
66
        self.context = context
×
67

68
    def register(self, helm_version=None):
6✔
69
        raise RuntimeError("Currently disabled, please don't use")
×
UNCOV
70
        self.context.app.register_plugin("kubeconfig")
UNCOV
71
        self.context.app.register_plugin("kubectl")
72

73
    def handle_init(self):
6✔
74
        context = self.context
×
75

76
        # Temp dir
77
        self.kops_dir = TemporaryDirectory()
×
78
        context.app.register_cleanup(self.kops_dir)
×
79
        self.kops_path = kops_path = Path(self.kops_dir.name)
×
80
        kops_config = kops_path / ".kops.yaml"
×
81
        kops_config.touch()
×
82
        self.kops_stanza.extend(("--config", str(kops_config)))
×
83

84
        # Kube config dir
85
        kubeconfig_dir = kops_path / ".kube"
×
86
        kubeconfig_dir.mkdir()
×
87
        self.kubeconfig_path = kubeconfig_dir / "config"
×
88

89
        # if logger.getEffectiveLevel() < logging.DEBUG:
90
        #    self.kops_stanza.append("-v5")
91
        # elif logger.getEffectiveLevel() < logging.INFO:
92
        #    self.kops_stanza.append("-v2")
93

94
        context.globals.kops = dict(walk_remote=self.walk_remote,
×
95
                                    walk_local=self.walk_local,
96
                                    update=self.update,
97
                                    export=self.export,
98
                                    master_interval="8m",
99
                                    node_interval="8m"
100
                                    )
101
        context.kops = dict()
×
102

103
        self.version = context.app.run_capturing_out(["kops", "version", "--short"], stderr_logger).strip()
×
104
        logger.info("Found kOps version %s", self.version)
×
105

106
        self.resource_definitions_schema = load_remote_file(logger,
×
107
                                                            f"https://raw.githubusercontent.com/kubernetes/kubernetes/"
108
                                                            f"v{OBJECT_SCHEMA_VERSION}/api/openapi-spec/swagger.json",
109
                                                            FileType.JSON)
110
        self._populate_resource_definitions()
×
111

112
        common_url_path = f"https://raw.githubusercontent.com/kubernetes/kops/v{self.version}/k8s/crds"
×
113
        for kops_crd in KOPS_CRDS:
×
114
            url = f"{common_url_path}/{kops_crd}"
×
115
            self.add_remote_crds(url, FileType.YAML, sub_category="kops")
×
116

117
    def handle_start(self):
6✔
118
        context = self.context
×
119

120
        # Exclude Kops YAMLs from K8S resource loading
121
        context.k8s.default_excludes.add("*.kops.yaml")
×
122
        context.k8s.default_excludes.add("*.kops.yml")
×
123

124
    def walk_remote(self, url, *paths: Path,
6✔
125
                    excludes=(".*",), includes=("*.kops.yaml", "*.kops.yml")):
126
        repository = self.context.app.repository(url)
×
127
        for path in paths:
×
128
            self.walk_local(repository.local_dir / path, excludes, includes)
×
129

130
    def walk_local(self, path: Path, excludes=(".*",), includes=("*.kops.yaml", "*.kops.yml")):
6✔
131
        context = self.context
×
132
        path = Path(path)
×
133

134
        for f in scan_dir(logger, path, lambda d: d.is_file(), Globs(excludes), Globs(includes)):
×
135
            p = path / f.name
×
136
            display_p = context.app.display_path(p)
×
137
            logger.debug("Adding Kops resources from %s", display_p)
×
138

139
            manifests = load_file(logger, p, FileType.YAML, display_p,
×
140
                                  self.template_engine,
141
                                  {"ktor": context})
142

143
            self.add_resources(manifests, display_p)
×
144

145
    def update(self):
6✔
146
        context = self.context
×
147
        run = context.app.run
×
148
        run_capturing_out = context.app.run_capturing_out
×
149

150
        os.environ["KUBECONFIG"] = str(self.kubeconfig_path)
×
151
        os.environ["KOPS_CLUSTER_NAME"] = context.kops.cluster_name
×
152
        os.environ["KOPS_STATE_STORE"] = context.kops.state_store
×
153

154
        kops_extra_args = ["--name", context.kops.cluster_name, "--state", context.kops.state_store]
×
155

156
        cmd = context.app.args.command
×
157
        dry_run = context.app.args.dry_run
×
158

159
        for resource in self.resources.values():
×
160
            logger.info("Replacing/creating kOps resource %s", resource)
×
161
            resource_out = io_StringIO()
×
162
            yaml.dump(resource.manifest, resource_out)
×
163
            if cmd != "apply" or dry_run:
×
164
                logger.info("Would replace kOps resource if not for dry-run mode: %s", resource_out.getvalue())
×
165
            else:
166
                run(self.kops_stanza + ["replace", "--force", "-f", "-"] + kops_extra_args,
×
167
                    stdout_logger,
168
                    stderr_logger,
169
                    resource_out.getvalue()).wait()
170

171
        logger.info("Staging kOps update")
×
172
        update_cmd = self.kops_stanza + ["update", "cluster"] + kops_extra_args
×
173
        result = run_capturing_out(update_cmd, stderr_logger)
×
174
        proc_logger.info(result)
×
175
        if "Must specify --yes to apply changes" in result:
×
176
            logger.info("kOps update would make changes")
×
177
            if cmd != "apply" or dry_run:
×
178
                logger.info("Skipping actual kOps update due to dry-run mode")
×
179
            else:
180
                logger.info("Running kOps update")
×
181
                run(update_cmd + ["--yes"],
×
182
                    stdout_logger,
183
                    stderr_logger).wait()
184

185
        self.export()
×
186

187
        if cmd != "apply":
×
188
            logger.info("Skipping cluster validation since not in apply mode")
×
189
        else:
190
            validation_failed = False
×
191
            try:
×
192
                output = run_capturing_out(self.kops_stanza + ["validate", "cluster", "-o", "json"],
×
193
                                           stderr_logger)
194
            except CalledProcessError as e:
×
195
                validation_failed = True
×
196
                output = e.output
×
197

198
            if validation_failed:
×
199
                if output:
×
200
                    validation_results = json.loads(output)
×
201
                    for failure in validation_results["failures"]:
×
202
                        logger.error("%s %s failed validation: %s", failure["type"], failure["name"],
×
203
                                     failure["message"])
204
                raise RuntimeError("Cluster validation failed!")
×
205
            else:
206
                logger.info("Cluster validation successful!")
×
207

208
        logger.info("Staging kOps cluster rolling update")
×
209
        rolling_update_cmd = self.kops_stanza + ["rolling-update", "cluster",
×
210
                                                 "--master-interval", context.kops.master_interval,
211
                                                 "--node-interval", context.kops.node_interval] + kops_extra_args
212
        result = run_capturing_out(rolling_update_cmd, stderr_logger)
×
213
        proc_logger.info(result)
×
214
        if "Must specify --yes to rolling-update" in result:
×
215
            logger.info("kOps cluster rolling update would make changes")
×
216
            if cmd != "apply" or dry_run:
×
217
                logger.info("Skipping actual kOps cluster rolling update due to dry-run")
×
218
            else:
219
                logger.info("Running kOps cluster rolling update")
×
220
                run(rolling_update_cmd + ["--yes"],
×
221
                    stdout_logger,
222
                    stderr_logger).wait()
223

224
    def export(self):
6✔
225
        context = self.context
×
226
        run = context.app.run
×
227

228
        kops_extra_args = ["--name", context.kops.cluster_name, "--state", context.kops.state_store]
×
229
        logger.info("Exporting kubeconfig from kOps")
×
230
        run(self.kops_stanza + ["export", "kubecfg", "--admin"] + kops_extra_args,
×
231
            stdout_logger,
232
            stderr_logger).wait()
233

234
        if not self.kubeconfig_path.exists():
×
235
            raise RuntimeError("kOps failed to export kubeconfig for unknown reason - check AWS credentials")
×
236

237
    def __repr__(self):
6✔
238
        return "kOps Plugin"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc