• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 15312394424

28 May 2025 11:07PM UTC coverage: 76.019% (-0.3%) from 76.274%
15312394424

push

github

web-flow
Merge pull request #76 from karellen/k8s_1_33_istio_1_26

Add testing for K8S 1.33.x and Istio 1.26.x

610 of 951 branches covered (64.14%)

Branch coverage included in aggregate %.

2373 of 2973 relevant lines covered (79.82%)

3.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.18
/src/main/python/kubernator/plugins/kops.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import json
5✔
20
import logging
5✔
21
import os
5✔
22
from pathlib import Path
5✔
23
from tempfile import TemporaryDirectory
5✔
24

25
import yaml
5✔
26

27
from kubernator.api import (KubernatorPlugin, scan_dir,
5✔
28
                            FileType,
29
                            load_remote_file,
30
                            io_StringIO,
31
                            TemplateEngine,
32
                            StripNL,
33
                            Globs)
34
from kubernator.plugins.k8s_api import K8SResourcePluginMixin
5✔
35
from kubernator.proc import CalledProcessError
5✔
36

37
logger = logging.getLogger("kubernator.kops")
5✔
38
proc_logger = logger.getChild("kops")
5✔
39
stdout_logger = StripNL(proc_logger.info)
5✔
40
stderr_logger = StripNL(proc_logger.warning)
5✔
41

42
KOPS_CRDS = ["kops.k8s.io_clusters.yaml",
5✔
43
             "kops.k8s.io_instancegroups.yaml",
44
             "kops.k8s.io_keysets.yaml",
45
             "kops.k8s.io_sshcredentials.yaml"]
46

47
OBJECT_SCHEMA_VERSION = "1.20.6"
5✔
48

49

50
class KopsPlugin(KubernatorPlugin, K8SResourcePluginMixin):
5✔
51
    logger = logger
5✔
52

53
    _name = "kops"
5✔
54

55
    def __init__(self):
5✔
56
        self.context = None
×
57
        self.kops_stanza = ["kops"]
×
58
        self.version = None
×
59
        self.kops_dir = None
×
60
        self.kops_path = None
×
61

62
        self.template_engine = TemplateEngine(logger)
×
63
        super().__init__()
×
64

65
    def set_context(self, context):
5✔
66
        self.context = context
×
67

68
    def register(self, helm_version=None):
5✔
69
        raise RuntimeError("Currently disabled, please don't use")
×
70
        self.context.app.register_plugin("kubeconfig")
71
        self.context.app.register_plugin("kubectl")
72

73
    def handle_init(self):
5✔
74
        context = self.context
×
75

76
        # Temp dir
77
        self.kops_dir = TemporaryDirectory()
×
78
        context.app.register_cleanup(self.kops_dir)
×
79
        self.kops_path = kops_path = Path(self.kops_dir.name)
×
80
        kops_config = kops_path / ".kops.yaml"
×
81
        kops_config.touch()
×
82
        self.kops_stanza.extend(("--config", str(kops_config)))
×
83

84
        # Kube config dir
85
        kubeconfig_dir = kops_path / ".kube"
×
86
        kubeconfig_dir.mkdir()
×
87
        self.kubeconfig_path = kubeconfig_dir / "config"
×
88

89
        # if logger.getEffectiveLevel() < logging.DEBUG:
90
        #    self.kops_stanza.append("-v5")
91
        # elif logger.getEffectiveLevel() < logging.INFO:
92
        #    self.kops_stanza.append("-v2")
93

94
        context.globals.kops = dict(walk_remote=self.walk_remote,
×
95
                                    walk_local=self.walk_local,
96
                                    update=self.update,
97
                                    export=self.export,
98
                                    master_interval="8m",
99
                                    node_interval="8m"
100
                                    )
101
        context.kops = dict()
×
102

103
        self.version = context.app.run_capturing_out(["kops", "version", "--short"], stderr_logger).strip()
×
104
        logger.info("Found kOps version %s", self.version)
×
105

106
        self.resource_definitions_schema = load_remote_file(logger,
×
107
                                                            f"https://raw.githubusercontent.com/kubernetes/kubernetes/"
108
                                                            f"v{OBJECT_SCHEMA_VERSION}/api/openapi-spec/swagger.json",
109
                                                            FileType.JSON)
110
        self._populate_resource_definitions()
×
111

112
        common_url_path = f"https://raw.githubusercontent.com/kubernetes/kops/v{self.version}/k8s/crds"
×
113
        for kops_crd in KOPS_CRDS:
×
114
            url = f"{common_url_path}/{kops_crd}"
×
115
            self.add_remote_crds(url, FileType.YAML, sub_category="kops")
×
116

117
    def handle_start(self):
5✔
118
        context = self.context
×
119

120
        # Exclude Kops YAMLs from K8S resource loading
121
        context.k8s.default_excludes.add("*.kops.yaml")
×
122
        context.k8s.default_excludes.add("*.kops.yml")
×
123

124
    def walk_remote(self, url, *paths: Path,
5✔
125
                    excludes=(".*",), includes=("*.kops.yaml", "*.kops.yml")):
126
        repository = self.context.app.repository(url)
×
127
        for path in paths:
×
128
            self.walk_local(repository.local_dir / path, excludes, includes)
×
129

130
    def walk_local(self, path: Path, excludes=(".*",), includes=("*.kops.yaml", "*.kops.yml")):
5✔
131
        context = self.context
×
132
        path = Path(path)
×
133

134
        for f in scan_dir(logger, path, lambda d: d.is_file(), Globs(excludes), Globs(includes)):
×
135
            p = path / f.name
×
136
            display_p = context.app.display_path(p)
×
137
            logger.debug("Adding Kops resources from %s", display_p)
×
138

139
            with open(p, "rt") as file:
×
140
                template = self.template_engine.from_string(file.read())
×
141

142
            self.add_resources(template.render({"ktor": context}), display_p)
×
143

144
    def update(self):
5✔
145
        context = self.context
×
146
        run = context.app.run
×
147
        run_capturing_out = context.app.run_capturing_out
×
148

149
        os.environ["KUBECONFIG"] = str(self.kubeconfig_path)
×
150
        os.environ["KOPS_CLUSTER_NAME"] = context.kops.cluster_name
×
151
        os.environ["KOPS_STATE_STORE"] = context.kops.state_store
×
152

153
        kops_extra_args = ["--name", context.kops.cluster_name, "--state", context.kops.state_store]
×
154

155
        cmd = context.app.args.command
×
156
        dry_run = context.app.args.dry_run
×
157

158
        for resource in self.resources.values():
×
159
            logger.info("Replacing/creating kOps resource %s", resource)
×
160
            resource_out = io_StringIO()
×
161
            yaml.dump(resource.manifest, resource_out)
×
162
            if cmd != "apply" or dry_run:
×
163
                logger.info("Would replace kOps resource if not for dry-run mode: %s", resource_out.getvalue())
×
164
            else:
165
                run(self.kops_stanza + ["replace", "--force", "-f", "-"] + kops_extra_args,
×
166
                    stdout_logger,
167
                    stderr_logger,
168
                    resource_out.getvalue()).wait()
169

170
        logger.info("Staging kOps update")
×
171
        update_cmd = self.kops_stanza + ["update", "cluster"] + kops_extra_args
×
172
        result = run_capturing_out(update_cmd, stderr_logger)
×
173
        proc_logger.info(result)
×
174
        if "Must specify --yes to apply changes" in result:
×
175
            logger.info("kOps update would make changes")
×
176
            if cmd != "apply" or dry_run:
×
177
                logger.info("Skipping actual kOps update due to dry-run mode")
×
178
            else:
179
                logger.info("Running kOps update")
×
180
                run(update_cmd + ["--yes"],
×
181
                    stdout_logger,
182
                    stderr_logger).wait()
183

184
        self.export()
×
185

186
        if cmd != "apply":
×
187
            logger.info("Skipping cluster validation since not in apply mode")
×
188
        else:
189
            validation_failed = False
×
190
            try:
×
191
                output = run_capturing_out(self.kops_stanza + ["validate", "cluster", "-o", "json"],
×
192
                                           stderr_logger)
193
            except CalledProcessError as e:
×
194
                validation_failed = True
×
195
                output = e.output
×
196

197
            if validation_failed:
×
198
                if output:
×
199
                    validation_results = json.loads(output)
×
200
                    for failure in validation_results["failures"]:
×
201
                        logger.error("%s %s failed validation: %s", failure["type"], failure["name"],
×
202
                                     failure["message"])
203
                raise RuntimeError("Cluster validation failed!")
×
204
            else:
205
                logger.info("Cluster validation successful!")
×
206

207
        logger.info("Staging kOps cluster rolling update")
×
208
        rolling_update_cmd = self.kops_stanza + ["rolling-update", "cluster",
×
209
                                                 "--master-interval", context.kops.master_interval,
210
                                                 "--node-interval", context.kops.node_interval] + kops_extra_args
211
        result = run_capturing_out(rolling_update_cmd, stderr_logger)
×
212
        proc_logger.info(result)
×
213
        if "Must specify --yes to rolling-update" in result:
×
214
            logger.info("kOps cluster rolling update would make changes")
×
215
            if cmd != "apply" or dry_run:
×
216
                logger.info("Skipping actual kOps cluster rolling update due to dry-run")
×
217
            else:
218
                logger.info("Running kOps cluster rolling update")
×
219
                run(rolling_update_cmd + ["--yes"],
×
220
                    stdout_logger,
221
                    stderr_logger).wait()
222

223
    def export(self):
5✔
224
        context = self.context
×
225
        run = context.app.run
×
226

227
        kops_extra_args = ["--name", context.kops.cluster_name, "--state", context.kops.state_store]
×
228
        logger.info("Exporting kubeconfig from kOps")
×
229
        run(self.kops_stanza + ["export", "kubecfg", "--admin"] + kops_extra_args,
×
230
            stdout_logger,
231
            stderr_logger).wait()
232

233
        if not self.kubeconfig_path.exists():
×
234
            raise RuntimeError("kOps failed to export kubeconfig for unknown reason - check AWS credentials")
×
235

236
    def __repr__(self):
5✔
237
        return "kOps Plugin"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc