• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 24637370495

19 Apr 2026 07:35PM UTC coverage: 82.036% (+0.04%) from 81.992%
24637370495

push

github

web-flow
Add project plugin v1: scope, ownership, cleanup (#103)

## Summary

Introduces a **project plugin** that adds hierarchical scope, ownership
tracking, and cleanup to Kubernator. Kubernator today is stateless —
every run reconciles everything it discovers and cannot tell whether a
resource it sees was applied by us, by someone else, or by a previous
run that has since dropped the manifest. Project v1 fixes all three:

- **Scope.** A user registers the project plugin once at the root with a
name; sub-directories extend the project via ``ktor.app.project =
\"<segment>\"`` (dot-joined top-down). CLI flags ``-I``/``-X`` scope a
run to specific sub-trees (repeatable, combineable: ``candidates = known
∩ includes`` then ``in_scope = candidates − excludes``).
- **Ownership.** Every applied resource is stamped with a
``kubernator.io/project`` annotation carrying the composed path. A
single gzipped JSON Secret per top-level project
(``<state_namespace>/kubernator-project-<sha1(root)[:12]>``) records the
idents of resources we own.
- **Cleanup.** On each run, the prior Secret's idents are diffed against
the current in-scope manifests; when ``cleanup=True``, resources that
used to be ours but are no longer in scope get deleted. Resources that
moved between sub-projects within the run's in-scope set are detected
and preserved (dedup on identity, ignoring project).

### Crash resilience

The Secret write is two-phase: before ``handle_apply`` runs, the new
intent is recorded as ``pending`` with ``finalized=false`` while
``resources`` still holds the prior finalized set. After
``handle_cleanup`` succeeds, Finalize commits the merged ``resources``
and clears ``pending`` with ``finalized=true``. A crash between the two
phases leaves ``finalized=false``; the next run's cleanup takes the
conservative union (``resources ∪ pending``) so no previously-owned
resource ever slips past cleanup.

### Concurrent-run protection

A ``coordination.k8s.io/v1.Lease`` named
``kubernator... (continued)

1078 of 1494 branches covered (72.16%)

Branch coverage included in aggregate %.

443 of 561 new or added lines in 5 files covered. (78.97%)

3 existing lines in 1 file now uncovered.

4434 of 5225 relevant lines covered (84.86%)

4.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.19
/src/main/python/kubernator/plugins/k8s.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19

20
import base64
5✔
21
import gzip
5✔
22
import hashlib
5✔
23
import json
5✔
24
import logging
5✔
25
import os
5✔
26
import re
5✔
27
import socket
5✔
28
import sys
5✔
29
import types
5✔
30
import uuid
5✔
31
from collections.abc import Mapping
5✔
32
from datetime import datetime, timedelta, timezone
5✔
33
from functools import partial, wraps
5✔
34
from importlib.metadata import version as pkg_version
5✔
35
from pathlib import Path
5✔
36
from typing import Iterable, Callable, Sequence, Optional
5✔
37

38
import gevent
5✔
39
import jsonpatch
5✔
40
import yaml
5✔
41

42
from kubernator.api import (KubernatorPlugin,
5✔
43
                            Globs,
44
                            scan_dir,
45
                            load_file,
46
                            FileType,
47
                            StripNL,
48
                            install_python_k8s_client,
49
                            TemplateEngine,
50
                            calling_frame_source,
51
                            parse_yaml_docs)
52
from kubernator.merge import extract_merge_instructions, apply_merge_instructions
5✔
53
from kubernator.plugins import k8s_schema
5✔
54
from kubernator.plugins.k8s_api import (K8SResourcePluginMixin,
5✔
55
                                        K8SResource,
56
                                        K8SResourceKey,
57
                                        K8SResourcePatchType,
58
                                        K8SPropagationPolicy,
59
                                        PROJECT_ANNOTATION,
60
                                        api_exc_format_body,
61
                                        api_exc_normalize_body)
62

63
logger = logging.getLogger("kubernator.k8s")
5✔
64
proc_logger = logger.getChild("proc")
5✔
65
stdout_logger = StripNL(proc_logger.info)
5✔
66
stderr_logger = StripNL(proc_logger.warning)
5✔
67

68
FIELD_VALIDATION_STRICT_MARKER = "strict decoding error: "
5✔
69
VALID_FIELD_VALIDATION = ("Ignore", "Warn", "Strict")
5✔
70

71
PROJECT_STATE_VERSION = "1"
5✔
72
PROJECT_STATE_SECRET_TYPE = "kubernator.io/project-state"
5✔
73
PROJECT_STATE_SECRET_DATA_KEY = "state"
5✔
74
PROJECT_ROOT_ANNOTATION = "kubernator.io/project-root"
5✔
75
PROJECT_LEASE_DURATION_SECONDS = 60
5✔
76
PROJECT_LEASE_RENEW_INTERVAL_SECONDS = 20
5✔
77
PROJECT_LEASE_ACQUIRE_ATTEMPTS = 3
5✔
78

79
# State Secret payload fields — also kept as constants so the wire format
80
# is changed through a single declaration.
81
_STATE_VERSION = "version"
5✔
82
_STATE_FINALIZED = "finalized"
5✔
83
_STATE_RESOURCES = "resources"
5✔
84
_STATE_PENDING = "pending"
5✔
85

86

87
def _project_matches(p: str, q: str) -> bool:
5✔
88
    """Prefix match: ``p`` matches ``q`` iff ``p == q`` or ``p`` starts with
89
    ``q + "."``. Sub-projects of ``q`` are considered matches."""
90
    return p == q or p.startswith(q + ".")
5✔
91

92

93
def _root_hash(root: str) -> str:
5✔
94
    return hashlib.sha1(root.encode("utf-8")).hexdigest()[:12]
5✔
95

96

97
def _state_secret_name(root: str) -> str:
5✔
98
    return "kubernator-project-%s" % (_root_hash(root),)
5✔
99

100

101
def _lease_name(root: str) -> str:
5✔
102
    return "kubernator-project-%s-lock" % (_root_hash(root),)
5✔
103

104

105
def _resource_ident(resource: K8SResource) -> dict:
5✔
106
    """Minimal identifier dict for a K8SResource — enough to delete it on a
107
    subsequent run. ``version`` is needed to build ``apiVersion`` for the
108
    delete call; it is intentionally excluded from ``_ident_key`` so that a
109
    CRD version bump doesn't spuriously mark a resource obsolete."""
110
    key = resource.key
5✔
111
    ident = {
5✔
112
        "group": key.group or "",
113
        "version": resource.rdef.version,
114
        "kind": key.kind,
115
        "name": key.name,
116
    }
117
    if key.namespace:
5✔
118
        ident["namespace"] = key.namespace
5✔
119
    return ident
5✔
120

121

122
def _ident_key(ident: dict) -> K8SResourceKey:
5✔
123
    return K8SResourceKey(ident.get("group", ""),
5✔
124
                          ident.get("kind", ""),
125
                          ident.get("name", ""),
126
                          ident.get("namespace"))
127

128

129
def _encode_state(payload: dict) -> str:
5✔
130
    raw = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8")
5✔
131
    compressed = gzip.compress(raw, compresslevel=6)
5✔
132
    return base64.b64encode(compressed).decode("ascii")
5✔
133

134

135
def _decode_state(encoded: str) -> dict:
5✔
136
    compressed = base64.b64decode(encoded.encode("ascii"))
5✔
137
    raw = gzip.decompress(compressed)
5✔
138
    return json.loads(raw.decode("utf-8"))
5✔
139

140

141
def _empty_state_payload() -> dict:
5✔
142
    return {
5✔
143
        _STATE_VERSION: PROJECT_STATE_VERSION,
144
        _STATE_FINALIZED: True,
145
        _STATE_RESOURCES: {},
146
        _STATE_PENDING: {},
147
    }
148

149

150
def _lease_identity() -> str:
5✔
151
    """Return a unique identifier for this run — hostname-pid-uuid4."""
152
    return "%s-%d-%s" % (socket.gethostname(), os.getpid(), uuid.uuid4().hex[:8])
5✔
153

154

155
def _pretty_api_exc(func):
5✔
156
    """Decorator: normalize then pretty-format any ``ApiException`` that
157
    propagates out, so logs and error traces see indented JSON instead of
158
    raw bytes or ``dict`` reprs. Matches the ``_normalize_api_exc`` /
159
    ``api_exc_format_body`` pattern used elsewhere in k8s_api.py."""
160

161
    @wraps(func)
5✔
162
    def wrapper(*args, **kwargs):
5✔
163
        from kubernetes.client.rest import ApiException
5✔
164
        try:
5✔
165
            return func(*args, **kwargs)
5✔
166
        except ApiException as e:
5✔
167
            api_exc_normalize_body(e)
5✔
168
            api_exc_format_body(e)
5✔
169
            raise
5✔
170

171
    return wrapper
5✔
172

173

174
def final_resource_validator(resources: Sequence[K8SResource],
5✔
175
                             resource: K8SResource,
176
                             error: Callable[..., Exception]) -> Iterable[Exception]:
177
    final_key = resource.get_manifest_key(resource.manifest)
5✔
178
    if final_key != resource.key:
5!
179
        yield error("Illegal change of identifiers of the resource "
×
180
                    "%s from %s have been changed to %s",
181
                    resource.key, resource.source, final_key)
182

183
    if resource.rdef.namespaced and not resource.namespace:
5!
184
        yield error("Namespaced resource %s from %s is missing the required namespace",
×
185
                    resource, resource.source)
186

187

188
def normalize_pkg_version(v: str):
5✔
189
    v_split = v.split(".")
5✔
190
    rev = v_split[-1]
5✔
191
    if not rev.isdigit():
5✔
192
        new_rev = ""
5✔
193
        for c in rev:
5!
194
            if not c.isdigit():
5✔
195
                break
5✔
196
            new_rev += c
5✔
197
        v_split[-1] = new_rev
5✔
198
    return tuple(map(int, v_split))
5✔
199

200

201
class KubernetesPlugin(KubernatorPlugin, K8SResourcePluginMixin):
5✔
202
    logger = logger
5✔
203

204
    _name = "k8s"
5✔
205

206
    def __init__(self):
5✔
207
        super().__init__()
5✔
208
        self.context = None
5✔
209

210
        self.embedded_pkg_version = self._get_kubernetes_client_version()
5✔
211

212
        self._transformers = []
5✔
213
        self._validators = []
5✔
214
        self._manifest_patchers = []
5✔
215
        self._resource_filters = []
5✔
216
        self._summary = 0, 0, 0
5✔
217
        self._template_engine = TemplateEngine(logger)
5✔
218
        self._in_scope_projects: Optional[set] = None
5✔
219
        # Project-run state, populated when the project plugin switch is on.
220
        self._project_lease_identity: Optional[str] = None
5✔
221
        self._project_lease_acquired = False
5✔
222
        self._project_lease_renewer: Optional[gevent.Greenlet] = None
5✔
223
        self._project_lease_abort = False
5✔
224
        self._project_prior_state: Optional[dict] = None
5✔
225
        self._project_new_intent: Optional[dict] = None
5✔
226
        # Cached resourceVersion for the state Secret, populated by the first
227
        # read and refreshed after each write so subsequent writes skip their
228
        # own GETs.
229
        self._project_state_rv: Optional[str] = None
5✔
230

231
    def set_context(self, context):
5✔
232
        self.context = context
5✔
233

234
    def register(self,
5✔
235
                 field_validation="Warn",
236
                 field_validation_warn_fatal=True,
237
                 disable_client_patches=False,
238
                 openapi_version="auto",
239
                 openapi_source="auto"):
240
        self.context.app.register_plugin("kubeconfig")
5✔
241

242
        if field_validation not in VALID_FIELD_VALIDATION:
5!
243
            raise ValueError("'field_validation' must be one of %s" % (", ".join(VALID_FIELD_VALIDATION)))
×
244

245
        if openapi_version not in ("auto", "v2", "v3"):
5!
246
            raise ValueError("'openapi_version' must be auto|v2|v3")
×
247
        if openapi_source not in ("auto", "cluster", "github"):
5!
248
            raise ValueError("'openapi_source' must be auto|cluster|github")
×
249

250
        context = self.context
5✔
251
        context.globals.k8s = dict(patch_field_excludes=("^/metadata/managedFields",
5✔
252
                                                         "^/metadata/generation",
253
                                                         "^/metadata/creationTimestamp",
254
                                                         "^/metadata/resourceVersion",
255
                                                         ),
256
                                   openapi_version=openapi_version,
257
                                   openapi_source=openapi_source,
258
                                   immutable_changes={("apps", "DaemonSet"): K8SPropagationPolicy.BACKGROUND,
259
                                                      ("apps", "StatefulSet"): K8SPropagationPolicy.ORPHAN,
260
                                                      ("apps", "Deployment"): K8SPropagationPolicy.ORPHAN,
261
                                                      ("storage.k8s.io", "StorageClass"): K8SPropagationPolicy.ORPHAN,
262
                                                      (None, "Pod"): K8SPropagationPolicy.BACKGROUND,
263
                                                      ("batch", "Job"): K8SPropagationPolicy.ORPHAN,
264
                                                      },
265
                                   default_includes=Globs(["*.yaml", "*.yml"], True),
266
                                   default_excludes=Globs([".*"], True),
267
                                   add_resources=self.add_resources,
268
                                   load_resources=self.api_load_resources,
269
                                   load_remote_resources=self.api_load_remote_resources,
270
                                   load_crds=self.api_load_crds,
271
                                   import_cluster_crds=self.api_import_cluster_crds,
272
                                   load_remote_crds=self.api_load_remote_crds,
273
                                   add_transformer=self.api_add_transformer,
274
                                   remove_transformer=self.api_remove_transformer,
275
                                   add_validator=self.api_add_validator,
276
                                   remove_validator=self.api_remove_validator,
277
                                   add_manifest_patcher=self.api_add_manifest_patcher,
278
                                   add_resource_filter=self.api_add_resource_filter,
279
                                   remove_resource_filter=self.api_remove_resource_filter,
280
                                   get_api_versions=self.get_api_versions,
281
                                   create_resource=self.create_resource,
282
                                   disable_client_patches=disable_client_patches,
283
                                   field_validation=field_validation,
284
                                   field_validation_warn_fatal=field_validation_warn_fatal,
285
                                   field_validation_warnings=0,
286
                                   resource_generator=self.resource_generator,
287
                                   resource=self.resource,
288
                                   conflict_retry_delay=0.3,
289
                                   _k8s=self,
290
                                   )
291
        context.k8s = dict(default_includes=Globs(context.globals.k8s.default_includes),
5✔
292
                           default_excludes=Globs(context.globals.k8s.default_excludes)
293
                           )
294
        self.api_add_validator(final_resource_validator)
5✔
295
        # Project hooks are always installed; they gate at runtime on
296
        # ``"project" in self.context.globals``.
297
        self.api_add_manifest_patcher(self._project_annotation_patcher)
5✔
298
        self.api_add_resource_filter(self._project_resource_filter)
5✔
299

300
    def handle_init(self):
5✔
301
        pass
5✔
302

303
    def handle_start(self):
5✔
304
        self.context.kubeconfig.register_change_notifier(self._kubeconfig_changed)
5✔
305
        self.setup_client()
5✔
306

307
    def _kubeconfig_changed(self):
5✔
308
        self.setup_client()
×
309

310
    def _get_kubernetes_client_version(self):
5✔
311
        return pkg_version("kubernetes").split(".")
5✔
312

313
    def setup_client(self):
5✔
314
        k8s = self.context.k8s
5✔
315
        if "server_version" not in k8s:
5!
316
            self._setup_client()
5✔
317

318
        server_minor = k8s.server_version[1]
5✔
319

320
        logger.info("Using Kubernetes client version =~%s.0 for server version %s",
5✔
321
                    server_minor, ".".join(k8s.server_version))
322
        pkg_dir = install_python_k8s_client(self.context.app.run_passthrough_capturing, server_minor, logger,
5✔
323
                                            stdout_logger, stderr_logger, k8s.disable_client_patches)
324

325
        modules_to_delete = []
5✔
326
        for k, v in sys.modules.items():
5✔
327
            if k == "kubernetes" or k.startswith("kubernetes."):
5✔
328
                modules_to_delete.append(k)
5✔
329
        for k in modules_to_delete:
5✔
330
            del sys.modules[k]
5✔
331

332
        logger.info("Adding sys.path reference to %s", pkg_dir)
5✔
333
        sys.path.insert(0, str(pkg_dir))
5✔
334
        self.embedded_pkg_version = self._get_kubernetes_client_version()
5✔
335
        logger.info("Switching to Kubernetes client version %s", ".".join(self.embedded_pkg_version))
5✔
336
        self._setup_client()
5✔
337

338
        self.validator = k8s_schema.make_validator(self.context)
5✔
339

340
    def _setup_client(self):
5✔
341
        from kubernetes import client
5✔
342
        from kubernetes.client.rest import RESTResponse
5✔
343

344
        # Upstream v35 client's exceptions.py reads ``http_resp.headers`` but
345
        # RESTResponse only defines ``getheaders()``, so any failed API call
346
        # that takes the default (``_preload_content=True``) wrapping path
347
        # raises ``AttributeError`` inside ``ApiException.__init__`` instead
348
        # of surfacing the actual error. Alias the attribute once.
349
        if not hasattr(RESTResponse, "headers"):
5!
350
            RESTResponse.headers = property(lambda self: self.getheaders())
5✔
351

352
        context = self.context
5✔
353
        k8s = context.k8s
5✔
354

355
        k8s.client = self._setup_k8s_client()
5✔
356
        version = client.VersionApi(k8s.client).get_code()
5✔
357
        # Strip vendor-specific suffixes so OpenAPI lookups hit upstream tags.
358
        # EKS/GKE use a dash (e.g. v1.28.3-eks-..., v1.28.3-gke.100);
359
        # k3s uses a plus sign (e.g. v1.35.3+k3s1).
360
        git_version = version.git_version.split("-")[0].split("+")[0]
5✔
361

362
        k8s.server_version = git_version[1:].split(".")
5✔
363
        k8s.server_git_version = git_version
5✔
364

365
        logger.info("Found Kubernetes %s on %s", k8s.server_git_version, k8s.client.configuration.host)
5✔
366

367
        K8SResource._k8s_client_version = normalize_pkg_version(pkg_version("kubernetes"))
5✔
368
        K8SResource._k8s_field_validation = k8s.field_validation
5✔
369
        K8SResource._k8s_field_validation_patched = not k8s.disable_client_patches
5✔
370
        K8SResource._logger = self.logger
5✔
371
        K8SResource._api_warnings = self._api_warnings
5✔
372

373
    def _api_warnings(self, resource, warn):
5✔
374
        k8s = self.context.k8s
5✔
375
        self.context.globals.k8s.field_validation_warnings += 1
5✔
376

377
        log = self.logger.warning
5✔
378
        if k8s.field_validation_warn_fatal:
5✔
379
            log = self.logger.error
5✔
380

381
        log("FAILED FIELD VALIDATION on resource %s from %s: %s", resource, resource.source, warn)
5✔
382

383
    def handle_before_dir(self, cwd: Path):
5✔
384
        context = self.context
5✔
385
        context.k8s.default_includes = Globs(context.k8s.default_includes)
5✔
386
        context.k8s.default_excludes = Globs(context.k8s.default_excludes)
5✔
387
        context.k8s.includes = Globs(context.k8s.default_includes)
5✔
388
        context.k8s.excludes = Globs(context.k8s.default_excludes)
5✔
389

390
    def handle_after_dir(self, cwd: Path):
5✔
391
        context = self.context
5✔
392
        k8s = context.k8s
5✔
393

394
        for f in scan_dir(logger, cwd, lambda d: d.is_file(), k8s.excludes, k8s.includes):
5✔
395
            p = cwd / f.name
5✔
396
            display_p = context.app.display_path(p)
5✔
397
            logger.debug("Adding Kubernetes manifest from %s", display_p)
5✔
398

399
            manifests = load_file(logger, p, FileType.YAML, display_p,
5✔
400
                                  self._template_engine,
401
                                  {"ktor": context}
402
                                  )
403

404
            for manifest in manifests:
5✔
405
                if manifest:
5!
406
                    self.add_resource(manifest, display_p)
5✔
407

408
    def resource_generator(self):
5✔
409
        for r in self.resources.values():
5✔
410
            if all(f(r) for f in self._resource_filters):
5!
411
                yield r
5✔
412

413
    def resource(self, manifest, source=None):
5✔
414
        from kubernetes import client
5✔
415
        if not source:
5!
416
            source = calling_frame_source()
5✔
417
        if isinstance(manifest, str):
5✔
418
            docs = [m for m in parse_yaml_docs(manifest, source) if m]
5✔
419
            if len(docs) != 1:
5!
420
                raise ValueError(f"ktor.k8s.resource() expects a single manifest document, got {len(docs)} from {source}")
×
421
            manifest = docs[0]
5✔
422
        res = self._create_resource(manifest, source)
5✔
423
        res.rdef.populate_api(client, self.context.k8s.client)
5✔
424
        return res
5✔
425

426
    def handle_apply(self):
5✔
427
        context = self.context
5✔
428
        k8s = context.k8s
5✔
429

430
        self._validate_resources()
5✔
431
        self._compute_project_scope()
5✔
432

433
        if "project" in context.globals:
5✔
434
            if self._project_ensure_state_namespace():
5!
435
                self._project_acquire_lease()
5✔
436
                self._project_start_renewal()
5✔
437
                self._project_read_prior_state()
5✔
438
                self._project_write_pre_apply()
5✔
439
                self._project_check_renewal()
5✔
440

441
        cmd = context.app.args.command
5✔
442
        file_name = context.app.args.file
5✔
443
        file_format = context.app.args.output_format
5✔
444
        dry_run = context.app.args.dry_run
5✔
445
        dump = cmd == "dump"
5✔
446

447
        status_msg = f"{' (dump only)' if dump else ' (dry run)' if dry_run else ''}"
5✔
448
        if dump:
5✔
449
            logger.info("Will dump the changes into a file %s in %s format", file_name or "<stdout>", file_format)
5✔
450

451
        patch_field_excludes = [re.compile(e) for e in context.globals.k8s.patch_field_excludes]
5✔
452
        dump_results = []
5✔
453
        total_created, total_patched, total_deleted = 0, 0, 0
5✔
454
        for resource in k8s.resource_generator():
5✔
455
            if dump:
5✔
456
                resource_id = {"apiVersion": resource.api_version,
5✔
457
                               "kind": resource.kind,
458
                               "name": resource.name
459
                               }
460

461
                def patch_func(patch):
5✔
462
                    if resource.rdef.namespaced:
5✔
463
                        resource_id["namespace"] = resource.namespace
5✔
464
                    method_descriptor = {"method": "patch",
5✔
465
                                         "resource": resource_id,
466
                                         "body": patch
467
                                         }
468
                    dump_results.append(method_descriptor)
5✔
469
                    return resource.manifest
5✔
470

471
                def create_func():
5✔
472
                    method_descriptor = {"method": "create",
5✔
473
                                         "body": resource.manifest}
474
                    dump_results.append(method_descriptor)
5✔
475
                    return resource.manifest
5✔
476

477
                def delete_func(*, propagation_policy):
5✔
478
                    method_descriptor = {"method": "delete",
×
479
                                         "resource": resource_id,
480
                                         "propagation_policy": propagation_policy.policy
481
                                         }
482
                    dump_results.append(method_descriptor)
×
483
                    return None
×
484
            else:
485
                patch_func = partial(resource.patch, patch_type=K8SResourcePatchType.JSON_PATCH, dry_run=dry_run)
5✔
486
                create_func = partial(resource.create, dry_run=dry_run)
5✔
487
                delete_func = partial(resource.delete, dry_run=dry_run)
5✔
488

489
            created, patched, deleted, result = self._apply_resource(dry_run,
5✔
490
                                                                     patch_field_excludes,
491
                                                                     resource,
492
                                                                     patch_func,
493
                                                                     create_func,
494
                                                                     delete_func,
495
                                                                     status_msg)
496

497
            total_created += created
5✔
498
            total_patched += patched
5✔
499
            total_deleted += deleted
5✔
500

501
        if ((dump or dry_run) and
5✔
502
                k8s.field_validation_warn_fatal and self.context.globals.k8s.field_validation_warnings):
503
            msg = ("There were %d field validation warnings and the warnings are fatal!" %
5✔
504
                   self.context.globals.k8s.field_validation_warnings)
505
            logger.fatal(msg)
5✔
506
            raise RuntimeError(msg)
5✔
507

508
        if dump:
5✔
509
            file = open(file_name, "w") if file_name else sys.stdout
5✔
510
            try:
5✔
511
                if file_format in ("json", "json-pretty"):
5✔
512
                    json.dump(dump_results, file, sort_keys=True,
5✔
513
                              indent=4 if file_format == "json-pretty" else None)
514
                else:
515
                    yaml.safe_dump(dump_results, file)
5✔
516
            finally:
517
                if file_name:
5✔
518
                    file.close()
5✔
519
        else:
520
            self._summary = total_created, total_patched, total_deleted
5✔
521

522
    def handle_cleanup(self):
5✔
523
        if "project" not in self.context.globals:
5✔
524
            return
5✔
525
        if self._project_prior_state is None:
5✔
526
            # handle_apply short-circuited (dry-run against missing namespace).
527
            return
5✔
528
        self._project_check_renewal()
5✔
529
        # A raise from _project_delete_obsolete keeps finalized=false so the
530
        # next run's conservative union re-considers the not-yet-deleted keys.
531
        self._project_delete_obsolete()
5✔
532
        self._project_write_finalize()
5✔
533

534
    def handle_shutdown(self):
5✔
535
        try:
5✔
536
            self._project_stop_renewal()
5✔
537
        finally:
538
            self._project_release_lease()
5✔
539

540
    def handle_summary(self):
5✔
541
        total_created, total_patched, total_deleted = self._summary
5✔
542
        logger.info("Created %d, patched %d, deleted %d resources", total_created, total_patched, total_deleted)
5✔
543

544
    def api_load_resources(self, path: Path, file_type: str):
5✔
545
        return self.add_local_resources(path, FileType[file_type.upper()])
×
546

547
    def api_load_remote_resources(self, url: str, file_type: str, file_category=None):
5✔
548
        return self.add_remote_resources(url, FileType[file_type.upper()], sub_category=file_category)
×
549

550
    def api_load_crds(self, path: Path, file_type: str):
5✔
551
        return self.add_local_crds(path, FileType[file_type.upper()])
5✔
552

553
    def api_load_remote_crds(self, url: str, file_type: str, file_category=None):
5✔
554
        return self.add_remote_crds(url, FileType[file_type.upper()], sub_category=file_category)
5✔
555

556
    def api_import_cluster_crds(self):
5✔
557
        context = self.context
×
558
        k8s = context.k8s
×
559
        client = k8s.client
×
560
        from kubernetes import client as client_module
×
561

562
        api = client_module.ApiextensionsV1Api(client)
×
563
        crds = api.list_custom_resource_definition(watch=False)
×
564
        for crd in crds.items:
×
565
            manifest = client.sanitize_for_serialization(crd)
×
566
            manifest["apiVersion"] = "apiextensions.k8s.io/v1"
×
567
            manifest["kind"] = "CustomResourceDefinition"
×
568
            self.add_crd(manifest)
×
569

570
    def api_add_transformer(self, transformer):
5✔
571
        if transformer not in self._transformers:
5!
572
            self._transformers.append(transformer)
5✔
573

574
    def api_add_validator(self, validator):
5✔
575
        if validator not in self._validators:
5!
576
            self._validators.append(validator)
5✔
577

578
    def api_add_manifest_patcher(self, patcher):
5✔
579
        if patcher not in self._manifest_patchers:
5!
580
            self._manifest_patchers.append(patcher)
5✔
581

582
    def api_add_resource_filter(self, pred):
5✔
583
        if pred not in self._resource_filters:
5✔
584
            self._resource_filters.append(pred)
5✔
585

586
    def api_remove_resource_filter(self, pred):
5✔
587
        if pred in self._resource_filters:
5✔
588
            self._resource_filters.remove(pred)
5✔
589

590
    def _project_annotation_patcher(self, manifest, resource_description):
5✔
591
        """Stamp every manifest with its ``kubernator.io/project`` annotation.
592
        No-op when the project plugin has not been registered."""
593
        if "project" not in self.context.globals:
5✔
594
            return manifest
5✔
595
        project = self.context.app.project
5✔
596
        if project is None:
5✔
597
            raise RuntimeError(
5✔
598
                "%s: project plugin active but no project set in this context" %
599
                (resource_description,))
600
        metadata = manifest.setdefault("metadata", {})
5✔
601
        annotations = metadata.setdefault("annotations", {})
5✔
602
        annotations[PROJECT_ANNOTATION] = project
5✔
603
        return manifest
5✔
604

605
    def _project_resource_filter(self, resource):
5✔
606
        """Scope resources by the cached ``in_scope`` set derived from
607
        ``-I``/``-X``. No-op when the project plugin is not active or when
608
        neither flag was supplied."""
609
        if self._in_scope_projects is None:
5✔
610
            return True
5✔
611
        return resource.project in self._in_scope_projects
5✔
612

613
    def _known_projects(self):
5✔
614
        """Projects present on currently-loaded manifests. Commit 8 extends
615
        this with prior projects recovered from the state Secret."""
616
        return {r.project for r in self.resources.values() if r.project}
5✔
617

618
    def _compute_project_scope(self):
5✔
619
        """Populate ``self._in_scope_projects`` — called once per apply run.
620

621
        * switch off → ``None`` (filter is a no-op).
622
        * no ``-I`` / ``-X`` given → ``None`` (every sub-project is in scope
623
          implicitly; annotation patcher still stamps, but no filtering).
624
        * otherwise, compute the in-scope set against ``known_projects``,
625
          validating that every pattern matches at least one known project.
626
        """
627
        self._in_scope_projects = None
5✔
628
        if "project" not in self.context.globals:
5✔
629
            return
5✔
630
        args = self.context.app.args
5✔
631
        includes = list(args.include_project or [])
5✔
632
        excludes = list(args.exclude_project or [])
5✔
633
        if not includes and not excludes:
5✔
634
            return
5✔
635

636
        known = self._known_projects()
5✔
637

638
        def _unmatched(patterns, flag):
5✔
639
            bad = [p for p in patterns
5✔
640
                   if not any(_project_matches(k, p) for k in known)]
641
            if bad:
5✔
642
                return ("%s values match no known project: %s (known: %s)"
5✔
643
                        % (flag, sorted(bad), sorted(known)))
644
            return None
5✔
645

646
        errs = [m for m in (_unmatched(includes, "-I/--include-project"),
5✔
647
                            _unmatched(excludes, "-X/--exclude-project"))
648
                if m]
649
        if errs:
5✔
650
            raise RuntimeError("; ".join(errs))
5✔
651

652
        if includes:
5✔
653
            candidates = {p for p in known
5✔
654
                          if any(_project_matches(p, i) for i in includes)}
655
        else:
656
            candidates = set(known)
5✔
657
        self._in_scope_projects = {p for p in candidates
5✔
658
                                   if not any(_project_matches(p, x) for x in excludes)}
659

660
    def _project_config(self):
5✔
661
        return self.context.globals.project
5✔
662

663
    def _project_root(self) -> str:
5✔
664
        return self._project_config().root
5✔
665

666
    def _project_state_namespace(self) -> str:
5✔
667
        return self._project_config().state_namespace
5✔
668

669
    def _project_cleanup_enabled(self) -> bool:
5✔
670
        return bool(self._project_config().cleanup)
5✔
671

672
    def _core_api(self):
5✔
673
        from kubernetes import client as k8s_client_module
5✔
674
        return k8s_client_module.CoreV1Api(self.context.k8s.client)
5✔
675

676
    def _coord_api(self):
5✔
677
        from kubernetes import client as k8s_client_module
5✔
678
        return k8s_client_module.CoordinationV1Api(self.context.k8s.client)
5✔
679

680
    def _dry_run_arg(self):
5✔
681
        return "All" if self.context.app.args.dry_run else None
5✔
682

683
    @_pretty_api_exc
5✔
684
    def _project_ensure_state_namespace(self) -> bool:
5✔
685
        """Ensure ``state_namespace`` exists. Return True if subsequent state
686
        operations (Lease, Secret) can proceed; return False on a dry-run
687
        invocation against a missing namespace — server-side dry-run would
688
        accept the namespace CREATE but not actually create it, causing every
689
        dependent in-namespace call to 404 confusingly.
690
        """
691
        from kubernetes import client as k8s_client_module
5✔
692
        from kubernetes.client.rest import ApiException
5✔
693

694
        ns = self._project_state_namespace()
5✔
695
        core = self._core_api()
5✔
696
        try:
5✔
697
            core.read_namespace(name=ns)
5✔
698
            return True
5✔
699
        except ApiException as e:
5✔
700
            if e.status != 404:
5!
NEW
701
                raise
×
702
        if self.context.app.args.dry_run:
5!
NEW
703
            logger.info(
×
704
                "State namespace %s does not exist; it would be created on a "
705
                "committed run. Skipping dry-run for Lease and state Secret to "
706
                "avoid a confusing validation cascade.", ns)
NEW
707
            return False
×
708
        logger.info("Creating state namespace %s", ns)
5✔
709
        body = k8s_client_module.V1Namespace(
5✔
710
            metadata=k8s_client_module.V1ObjectMeta(name=ns))
711
        try:
5✔
712
            core.create_namespace(body=body)
5✔
NEW
713
        except ApiException as e:
×
NEW
714
            if e.status == 409:
×
NEW
715
                return True  # Raced; already exists
×
NEW
716
            raise
×
717
        return True
5✔
718

719
    @_pretty_api_exc
5✔
720
    def _project_acquire_lease(self):
5✔
721
        """Acquire a per-root Lease in ``state_namespace``. Fail fast on a
722
        live lease; take over a stale one using resourceVersion precondition.
723
        """
724
        from kubernetes import client as k8s_client_module
5✔
725
        from kubernetes.client.rest import ApiException
5✔
726

727
        if self.context.app.args.dry_run:
5!
NEW
728
            logger.info("Skipping Lease acquisition in dry-run mode")
×
NEW
729
            return
×
730

731
        self._project_lease_identity = _lease_identity()
5✔
732
        lease_name = _lease_name(self._project_root())
5✔
733
        ns = self._project_state_namespace()
5✔
734
        coord = self._coord_api()
5✔
735

736
        for _ in range(PROJECT_LEASE_ACQUIRE_ATTEMPTS):
5!
737
            try:
5✔
738
                current = coord.read_namespaced_lease(name=lease_name, namespace=ns)
5✔
739
            except ApiException as e:
5✔
740
                if e.status != 404:
5!
NEW
741
                    raise
×
742
                current = None
5✔
743

744
            now = datetime.now(timezone.utc)
5✔
745
            if current is None:
5!
746
                body = k8s_client_module.V1Lease(
5✔
747
                    metadata=k8s_client_module.V1ObjectMeta(name=lease_name, namespace=ns),
748
                    spec=k8s_client_module.V1LeaseSpec(
749
                        holder_identity=self._project_lease_identity,
750
                        lease_duration_seconds=PROJECT_LEASE_DURATION_SECONDS,
751
                        acquire_time=now,
752
                        renew_time=now,
753
                    ),
754
                )
755
                try:
5✔
756
                    coord.create_namespaced_lease(namespace=ns, body=body)
5✔
NEW
757
                except ApiException as e:
×
NEW
758
                    if e.status == 409:
×
NEW
759
                        continue  # Raced — re-read next iteration
×
NEW
760
                    raise
×
761
                self._project_lease_acquired = True
5✔
762
                logger.info("Acquired project Lease %s in %s as %s",
5✔
763
                            lease_name, ns, self._project_lease_identity)
764
                return
5✔
765

NEW
766
            spec = current.spec
×
NEW
767
            holder = spec.holder_identity or "<unknown>"
×
NEW
768
            duration = spec.lease_duration_seconds or PROJECT_LEASE_DURATION_SECONDS
×
NEW
769
            renew = spec.renew_time or spec.acquire_time
×
NEW
770
            if renew is not None and renew.tzinfo is None:
×
NEW
771
                renew = renew.replace(tzinfo=timezone.utc)
×
NEW
772
            expiry = (renew + timedelta(seconds=duration)) if renew else now
×
NEW
773
            if expiry >= now:
×
NEW
774
                raise RuntimeError(
×
775
                    "Lease %s in %s is held by %r (expires %s); "
776
                    "another kubernator run is active. Retry later."
777
                    % (lease_name, ns, holder, expiry.isoformat()))
NEW
778
            logger.warning("Lease %s is stale (holder=%r, expired at %s); taking over",
×
779
                           lease_name, holder, expiry.isoformat())
NEW
780
            current.spec.holder_identity = self._project_lease_identity
×
NEW
781
            current.spec.lease_duration_seconds = PROJECT_LEASE_DURATION_SECONDS
×
NEW
782
            current.spec.acquire_time = now
×
NEW
783
            current.spec.renew_time = now
×
NEW
784
            try:
×
NEW
785
                coord.replace_namespaced_lease(
×
786
                    name=lease_name, namespace=ns, body=current)
NEW
787
                self._project_lease_acquired = True
×
NEW
788
                logger.info("Took over stale Lease %s in %s as %s",
×
789
                            lease_name, ns, self._project_lease_identity)
NEW
790
                return
×
NEW
791
            except ApiException as e:
×
NEW
792
                if e.status == 409:
×
NEW
793
                    continue  # resourceVersion precondition failed — retry
×
NEW
794
                raise
×
NEW
795
        raise RuntimeError(
×
796
            "Failed to acquire Lease %s in %s after %d attempts"
797
            % (lease_name, ns, PROJECT_LEASE_ACQUIRE_ATTEMPTS))
798

799
    def _project_renew_loop(self):
5✔
800
        """Greenlet body: every ``RENEW_INTERVAL_SECONDS``, bump the Lease's
801
        renewTime. On any failure mode that indicates lost ownership, sets
802
        ``_project_lease_abort`` so the main greenlet can notice."""
803
        from kubernetes.client.rest import ApiException
5✔
804

805
        lease_name = _lease_name(self._project_root())
5✔
806
        ns = self._project_state_namespace()
5✔
807
        coord = self._coord_api()
5✔
808

809
        while True:
5✔
810
            gevent.sleep(PROJECT_LEASE_RENEW_INTERVAL_SECONDS)
5✔
NEW
811
            try:
×
NEW
812
                current = coord.read_namespaced_lease(name=lease_name, namespace=ns)
×
NEW
813
            except ApiException as e:
×
NEW
814
                api_exc_normalize_body(e)
×
NEW
815
                api_exc_format_body(e)
×
NEW
816
                logger.error("Lease %s renewal read failed: %s", lease_name, e)
×
NEW
817
                self._project_lease_abort = True
×
NEW
818
                return
×
NEW
819
            if (current.spec.holder_identity or "") != self._project_lease_identity:
×
NEW
820
                logger.error("Lease %s identity mismatch (expected %r, got %r); aborting",
×
821
                             lease_name, self._project_lease_identity,
822
                             current.spec.holder_identity)
NEW
823
                self._project_lease_abort = True
×
NEW
824
                return
×
NEW
825
            current.spec.renew_time = datetime.now(timezone.utc)
×
NEW
826
            try:
×
NEW
827
                coord.replace_namespaced_lease(
×
828
                    name=lease_name, namespace=ns, body=current)
NEW
829
                logger.debug("Renewed Lease %s", lease_name)
×
NEW
830
            except ApiException as e:
×
NEW
831
                api_exc_normalize_body(e)
×
NEW
832
                api_exc_format_body(e)
×
NEW
833
                logger.error("Lease %s renewal replace failed (%s); aborting", lease_name, e)
×
NEW
834
                self._project_lease_abort = True
×
NEW
835
                return
×
836

837
    def _project_start_renewal(self):
5✔
838
        if not self._project_lease_acquired:
5✔
839
            return
5✔
840
        self._project_lease_renewer = gevent.spawn(self._project_renew_loop)
5✔
841

842
    def _project_stop_renewal(self):
5✔
843
        g = self._project_lease_renewer
5✔
844
        self._project_lease_renewer = None
5✔
845
        if g is not None and not g.dead:
5✔
846
            g.kill(block=False)
5✔
847

848
    def _project_release_lease(self):
5✔
849
        from kubernetes.client.rest import ApiException
5✔
850

851
        if not self._project_lease_acquired:
5✔
852
            return
5✔
853
        lease_name = _lease_name(self._project_root())
5✔
854
        ns = self._project_state_namespace()
5✔
855
        coord = self._coord_api()
5✔
856
        try:
5✔
857
            coord.delete_namespaced_lease(name=lease_name, namespace=ns)
5✔
NEW
858
            logger.info("Released project Lease %s", lease_name)
1✔
NEW
859
        except ApiException as e:
×
NEW
860
            if e.status == 404:
×
NEW
861
                logger.debug("Lease %s already gone on release", lease_name)
×
862
            else:
NEW
863
                api_exc_normalize_body(e)
×
NEW
864
                api_exc_format_body(e)
×
NEW
865
                logger.warning("Failed to delete Lease %s: %s", lease_name, e)
×
866
        finally:
NEW
867
            self._project_lease_acquired = False
1✔
868

869
    def _project_check_renewal(self):
5✔
870
        if self._project_lease_abort:
5✔
871
            raise RuntimeError(
5✔
872
                "Project Lease was lost during run (renewal failure). Aborting to "
873
                "avoid clobbering another run's state.")
874

875
    # --- State Secret I/O --------------------------------------------------
876

877
    @_pretty_api_exc
5✔
878
    def _project_read_prior_state(self):
5✔
879
        from kubernetes.client.rest import ApiException
5✔
880

881
        ns = self._project_state_namespace()
5✔
882
        root = self._project_root()
5✔
883
        name = _state_secret_name(root)
5✔
884
        core = self._core_api()
5✔
885
        try:
5✔
886
            secret = core.read_namespaced_secret(name=name, namespace=ns)
5✔
NEW
887
        except ApiException as e:
1✔
NEW
888
            if e.status != 404:
1!
NEW
889
                raise
×
NEW
890
            logger.info("No prior project state Secret %s — baseline will be recorded this run", name)
1✔
NEW
891
            self._project_prior_state = _empty_state_payload()
1✔
NEW
892
            self._project_state_rv = None
1✔
NEW
893
            return
1✔
NEW
894
        self._project_state_rv = secret.metadata.resource_version
1✔
NEW
895
        data = secret.data or {}
1✔
NEW
896
        encoded = data.get(PROJECT_STATE_SECRET_DATA_KEY)
1✔
NEW
897
        if not encoded:
1!
NEW
898
            logger.warning("Project state Secret %s is missing key %r — treating as baseline",
×
899
                           name, PROJECT_STATE_SECRET_DATA_KEY)
NEW
900
            self._project_prior_state = _empty_state_payload()
×
NEW
901
            return
×
NEW
902
        try:
1✔
NEW
903
            payload = _decode_state(encoded)
1✔
NEW
904
        except Exception as e:
×
NEW
905
            raise RuntimeError(
×
906
                "Failed to decode project state Secret %s: %s" % (name, e)) from e
NEW
907
        if payload.get(_STATE_VERSION) != PROJECT_STATE_VERSION:
1!
NEW
908
            raise RuntimeError(
×
909
                "Project state Secret %s has unsupported version %r (expected %r)"
910
                % (name, payload.get(_STATE_VERSION), PROJECT_STATE_VERSION))
NEW
911
        if not payload.get(_STATE_FINALIZED, True):
1!
NEW
912
            logger.warning(
×
913
                "Project state Secret %s was not finalized by the previous run "
914
                "(pending=%s). The prior run may have crashed; cleanup will "
915
                "conservatively consider both resources and pending.",
916
                name, sorted(payload.get(_STATE_PENDING, {}).keys()))
NEW
917
        self._project_prior_state = payload
1✔
918

919
    def _project_compute_new_intent(self):
5✔
920
        """Group current in-scope resources by project, as sorted ident lists.
921

922
        Returns a dict mapping sub-project name → sorted list of ident dicts,
923
        plus the set of currently-annotated projects (for cleanup diff)."""
924
        current = {}
5✔
925
        for r in self.resources.values():
5✔
926
            p = r.project
5✔
927
            if not p:
5✔
928
                continue
5✔
929
            current.setdefault(p, []).append(_resource_ident(r))
5✔
930
        # Apply scope filter (if active) to restrict what gets recorded.
931
        if self._in_scope_projects is not None:
5✔
932
            current = {p: v for p, v in current.items() if p in self._in_scope_projects}
5✔
933
        # Sort each project's idents for stable Secret content across runs.
934
        for p in current:
5✔
935
            current[p].sort(key=_ident_key)
5✔
936
        return current
5✔
937

938
    def _project_merge_resources(self, prior_resources: dict, new_intent: dict) -> dict:
5✔
939
        """Return the finalized ``resources`` map: in-scope sub-projects come
940
        from ``new_intent``; out-of-scope sub-projects are preserved verbatim."""
941
        merged = {
5✔
942
            p: list(idents) for p, idents in prior_resources.items()
943
            if self._in_scope_projects is not None and p not in self._in_scope_projects
944
        }
945
        for p, idents in new_intent.items():
5✔
946
            merged[p] = list(idents)
5✔
947
        return merged
5✔
948

949
    @_pretty_api_exc
5✔
950
    def _project_write_state(self, payload: dict):
5✔
951
        """Create or replace the state Secret. Uses the cached ``resource_version``
952
        from the initial read (or prior write) so we don't re-GET every time;
953
        falls back to a fresh read on a 409 precondition failure."""
954
        from kubernetes import client as k8s_client_module
5✔
955
        from kubernetes.client.rest import ApiException
5✔
956

957
        ns = self._project_state_namespace()
5✔
958
        root = self._project_root()
5✔
959
        name = _state_secret_name(root)
5✔
960
        core = self._core_api()
5✔
961
        encoded = _encode_state(payload)
5✔
962
        dry_run = self._dry_run_arg()
5✔
963
        common_kwargs = {"dry_run": dry_run} if dry_run else {}
5✔
964

965
        def _body(rv):
5✔
966
            meta = k8s_client_module.V1ObjectMeta(
5✔
967
                name=name, namespace=ns,
968
                annotations={PROJECT_ROOT_ANNOTATION: root})
969
            if rv is not None:
5✔
970
                meta.resource_version = rv
5✔
971
            return k8s_client_module.V1Secret(
5✔
972
                metadata=meta,
973
                type=PROJECT_STATE_SECRET_TYPE,
974
                data={PROJECT_STATE_SECRET_DATA_KEY: encoded},
975
            )
976

977
        if self._project_state_rv is None:
5✔
978
            # Prior read returned 404 — create. Handle 409 (raced create) by
979
            # falling through to the replace branch.
980
            try:
5✔
981
                resp = core.create_namespaced_secret(
5✔
982
                    namespace=ns, body=_body(None), **common_kwargs)
983
                self._project_state_rv = resp.metadata.resource_version
5✔
984
                logger.debug("Created project state Secret %s (%d bytes payload)",
5✔
985
                             name, len(encoded))
986
                return
5✔
NEW
987
            except ApiException as e:
×
NEW
988
                if e.status != 409:
×
NEW
989
                    raise
×
990
                # Re-read to pick up the winner's resourceVersion.
NEW
991
                existing = core.read_namespaced_secret(name=name, namespace=ns)
×
NEW
992
                self._project_state_rv = existing.metadata.resource_version
×
993

994
        try:
5✔
995
            resp = core.replace_namespaced_secret(
5✔
996
                name=name, namespace=ns,
997
                body=_body(self._project_state_rv),
998
                **common_kwargs)
NEW
999
        except ApiException as e:
×
NEW
1000
            if e.status != 409:
×
NEW
1001
                raise
×
1002
            # Our cached resourceVersion is stale; re-read once and retry.
NEW
1003
            existing = core.read_namespaced_secret(name=name, namespace=ns)
×
NEW
1004
            self._project_state_rv = existing.metadata.resource_version
×
NEW
1005
            resp = core.replace_namespaced_secret(
×
1006
                name=name, namespace=ns,
1007
                body=_body(self._project_state_rv),
1008
                **common_kwargs)
1009
        self._project_state_rv = resp.metadata.resource_version
5✔
1010
        logger.debug("Replaced project state Secret %s (%d bytes payload)",
5✔
1011
                     name, len(encoded))
1012

1013
    def _project_write_pre_apply(self):
5✔
1014
        """Phase 1: record ``pending=new_intent`` with ``finalized=false`` while
1015
        keeping ``resources=prior_resources``. A crash before the Finalize
1016
        write leaves both visible so the next run's conservative union of
1017
        ``resources`` and ``pending`` still picks up everything we owned."""
1018
        self._project_new_intent = self._project_compute_new_intent()
5✔
1019
        payload = {
5✔
1020
            _STATE_VERSION: PROJECT_STATE_VERSION,
1021
            _STATE_FINALIZED: False,
1022
            _STATE_RESOURCES: dict(self._project_prior_state.get(_STATE_RESOURCES, {})),
1023
            _STATE_PENDING: self._project_new_intent,
1024
        }
1025
        self._project_write_state(payload)
5✔
1026

1027
    def _project_write_finalize(self):
5✔
1028
        prior_resources = self._project_prior_state.get(_STATE_RESOURCES, {})
5✔
1029
        merged_resources = self._project_merge_resources(
5✔
1030
            prior_resources, self._project_new_intent or {})
1031
        payload = {
5✔
1032
            _STATE_VERSION: PROJECT_STATE_VERSION,
1033
            _STATE_FINALIZED: True,
1034
            _STATE_RESOURCES: merged_resources,
1035
            _STATE_PENDING: {},
1036
        }
1037
        self._project_write_state(payload)
5✔
1038

1039
    def _project_compute_obsolete(self) -> list:
5✔
1040
        prior_resources = self._project_prior_state.get(_STATE_RESOURCES, {})
5✔
1041
        prior_pending = self._project_prior_state.get(_STATE_PENDING, {})
5✔
1042
        prior_finalized = self._project_prior_state.get(_STATE_FINALIZED, True)
5✔
1043

1044
        prior_projects = set(prior_resources.keys())
5✔
1045
        if not prior_finalized:
5✔
1046
            prior_projects |= set(prior_pending.keys())
5✔
1047

1048
        current_in_scope_keys = {
5✔
1049
            _ident_key(ident)
1050
            for idents in (self._project_new_intent or {}).values()
1051
            for ident in idents
1052
        }
1053

1054
        prior_in_scope_keys = {}
5✔
1055
        for p in prior_projects:
5✔
1056
            if self._in_scope_projects is not None and p not in self._in_scope_projects:
5✔
1057
                continue
5✔
1058
            for ident in prior_resources.get(p, []):
5✔
1059
                prior_in_scope_keys[_ident_key(ident)] = ident
5✔
1060
            if not prior_finalized:
5✔
1061
                for ident in prior_pending.get(p, []):
5✔
1062
                    prior_in_scope_keys.setdefault(_ident_key(ident), ident)
5✔
1063

1064
        to_delete = [ident for key, ident in prior_in_scope_keys.items()
5✔
1065
                     if key not in current_in_scope_keys]
1066
        to_delete.sort(key=_ident_key)
5✔
1067
        return to_delete
5✔
1068

1069
    def _project_delete_obsolete(self):
5✔
1070
        """Delete the resources that used to be ours but are no longer in
1071
        scope. Accumulates errors — the run fails after best-effort deletion
1072
        so the Finalize write is skipped and the next run retries."""
1073
        from kubernetes.client.rest import ApiException
5✔
1074

1075
        to_delete = self._project_compute_obsolete()
5✔
1076
        if not to_delete:
5✔
1077
            return
5✔
1078
        dry_run = self.context.app.args.dry_run
5✔
1079
        cleanup_enabled = self._project_cleanup_enabled()
5✔
1080
        if not cleanup_enabled:
5✔
1081
            preview = ["%s/%s/%s%s" % (i.get("group") or "core",
5✔
1082
                                       i["kind"], i["name"],
1083
                                       "." + i["namespace"] if i.get("namespace") else "")
1084
                       for i in to_delete]
1085
            logger.info(
5✔
1086
                "Project cleanup is disabled — %d obsolete resource(s) would be deleted "
1087
                "if cleanup=True: %s",
1088
                len(to_delete), preview)
1089
            return
5✔
1090

1091
        failures = []
5✔
1092
        for ident in to_delete:
5✔
1093
            group = ident.get("group") or ""
5✔
1094
            version = ident.get("version") or "v1"
5✔
1095
            api_version = "%s/%s" % (group, version) if group else version
5✔
1096
            manifest = {
5✔
1097
                "apiVersion": api_version,
1098
                "kind": ident["kind"],
1099
                "metadata": {"name": ident["name"]},
1100
            }
1101
            if ident.get("namespace"):
5!
1102
                manifest["metadata"]["namespace"] = ident["namespace"]
5✔
1103
            try:
5✔
1104
                res = self.resource(manifest)
5✔
NEW
1105
            except Exception as e:
×
NEW
1106
                logger.critical("Cannot resolve resource for cleanup %s: %s", ident, e)
×
NEW
1107
                failures.append((ident, e))
×
NEW
1108
                continue
×
1109
            try:
5✔
1110
                logger.info("Cleanup: deleting obsolete resource %s%s",
5✔
1111
                            res, " (dry run)" if dry_run else "")
1112
                res.delete(dry_run=dry_run, wait=False)
5✔
NEW
1113
            except ApiException as e:
×
NEW
1114
                if e.status == 404:
×
NEW
1115
                    logger.debug("Cleanup: %s already gone", res)
×
NEW
1116
                    continue
×
1117
                # res.delete is decorated with _normalize_api_exc upstream —
1118
                # body is already parsed; re-format for pretty output.
NEW
1119
                api_exc_format_body(e)
×
NEW
1120
                logger.critical("Cleanup: failed to delete %s: %s", res, e)
×
NEW
1121
                failures.append((ident, e))
×
1122
        if failures:
5!
NEW
1123
            raise RuntimeError(
×
1124
                "Project cleanup failed to delete %d resource(s); see CRITICAL log lines "
1125
                "above. Secret left with finalized=false — next run will retry." %
1126
                (len(failures),))
1127

1128
    def api_remove_transformer(self, transformer):
5✔
1129
        if transformer in self._transformers:
5✔
1130
            self._transformers.remove(transformer)
5✔
1131

1132
    def api_remove_validator(self, validator):
5✔
1133
        if validator in self._validators:
5✔
1134
            self._validators.remove(validator)
5✔
1135

1136
    def api_validation_error(self, msg, *args):
5✔
1137
        frame = sys._getframe().f_back
×
1138
        tb = None
×
1139
        while True:
×
1140
            if not frame:
×
1141
                break
×
1142
            tb = types.TracebackType(tb, frame, frame.f_lasti, frame.f_lineno)
×
1143
            frame = frame.f_back
×
1144
        return ValueError((msg % args) if args else msg).with_traceback(tb)
×
1145

1146
    def _patch_manifest(self,
5✔
1147
                        manifest: dict,
1148
                        resource_description: str):
1149
        for patcher in reversed(self._manifest_patchers):
5✔
1150
            logger.debug("Applying patcher %s to %s",
5✔
1151
                         getattr(patcher, "__name__", patcher),
1152
                         resource_description)
1153
            manifest = patcher(manifest, resource_description) or manifest
5✔
1154

1155
        return manifest
5✔
1156

1157
    def _transform_resource(self, resources: Sequence[K8SResource], resource: K8SResource) -> K8SResource:
5✔
1158
        for transformer in reversed(self._transformers):
5✔
1159
            logger.debug("Applying transformer %s to %s from %s",
5✔
1160
                         getattr(transformer, "__name__", transformer),
1161
                         resource, resource.source)
1162
            resource = transformer(resources, resource) or resource
5✔
1163

1164
        return resource
5✔
1165

1166
    def _validate_resources(self):
5✔
1167
        errors: list[Exception] = []
5✔
1168
        for resource in self.resources.values():
5✔
1169
            for validator in reversed(self._validators):
5✔
1170
                logger.debug("Applying validator %s to %s from %s",
5✔
1171
                             getattr(validator, "__name__", validator),
1172
                             resource, resource.source)
1173
                errors.extend(validator(self.resources, resource, self.api_validation_error))
5✔
1174
        if errors:
5!
1175
            for error in errors:
×
1176
                logger.error("Validation error: %s", error)
×
1177
            raise errors[0]
×
1178

1179
    def _apply_resource(self,
5✔
1180
                        dry_run,
1181
                        patch_field_excludes: Iterable[re.compile],
1182
                        resource: K8SResource,
1183
                        patch_func: Callable[[Iterable[dict]], Optional[dict]],
1184
                        create_func: Callable[[], Optional[dict]],
1185
                        delete_func: Callable[[K8SPropagationPolicy], None],
1186
                        status_msg):
1187
        from kubernetes import client
5✔
1188
        from kubernetes.client.rest import ApiException
5✔
1189

1190
        rdef = resource.rdef
5✔
1191
        rdef.populate_api(client, self.context.k8s.client)
5✔
1192

1193
        def handle_400_strict_validation_error(e: ApiException):
5✔
1194
            if e.status == 400:
5!
1195
                # Assumes the body has been parsed
1196
                status = e.body
5✔
1197
                if status["status"] == "Failure":
5!
1198
                    if FIELD_VALIDATION_STRICT_MARKER in status["message"]:
5!
1199
                        message = status["message"]
5✔
1200
                        messages = message[message.find(FIELD_VALIDATION_STRICT_MARKER) +
5✔
1201
                                           len(FIELD_VALIDATION_STRICT_MARKER):].split(",")
1202
                        for m in messages:
5✔
1203
                            self._api_warnings(resource, m.strip())
5✔
1204

1205
                        raise e from None
5✔
1206
                    else:
1207
                        logger.error("FAILED MODIFYING resource %s from %s: %s",
×
1208
                                     resource, resource.source, status["message"])
1209
                        raise e from None
×
1210

1211
        def create(exists_ok=False):
5✔
1212
            logger.info("Creating resource %s%s%s", resource, status_msg,
5✔
1213
                        " (ignoring existing)" if exists_ok else "")
1214
            try:
5✔
1215
                return create_func()
5✔
1216
            except ApiException as __e:
5✔
1217
                if exists_ok and __e.status == 409 and __e.body["reason"] == "AlreadyExists":
5!
1218
                    return None
×
1219
                raise
5✔
1220

1221
        merge_instrs, normalized_manifest = extract_merge_instructions(resource.manifest, resource)
5✔
1222
        if merge_instrs:
5✔
1223
            logger.trace("Normalized manifest (no merge instructions) for resource %s: %s", resource,
5✔
1224
                         normalized_manifest)
1225
        else:
1226
            normalized_manifest = resource.manifest
5✔
1227

1228
        logger.debug("Applying resource %s%s", resource, status_msg)
5✔
1229
        try:
5✔
1230
            remote_resource = resource.get()
5✔
1231
            logger.trace("Current resource %s: %s", resource, remote_resource)
5✔
1232
            # v3 evaluates transition rules here (oldSelf bound to the
1233
            # server's current state). v2 has no transition rules and
1234
            # the resource was already schema-validated at add_resource
1235
            # time, so skip a second pass.
1236
            if self.validator.version == "v3":
5✔
1237
                transition_errors = list(
5✔
1238
                    self.validator.iter_errors(resource.manifest, resource.rdef,
1239
                                               old_manifest=remote_resource))
1240
                if transition_errors:
5!
1241
                    for err in transition_errors:
×
1242
                        logger.error("Transition rule violation on %s from %s: %s",
×
1243
                                     resource, resource.source, err)
1244
                    raise transition_errors[0]
×
1245
        except ApiException as e:
5✔
1246
            try:
5✔
1247
                if e.status == 404:
5!
1248
                    try:
5✔
1249
                        return 1, 0, 0, create()
5✔
1250
                    except ApiException as e:
5✔
1251
                        if not handle_400_strict_validation_error(e):
5!
1252
                            raise
×
1253
                else:
1254
                    raise
×
1255
            except ApiException as _e:
5✔
1256
                api_exc_format_body(_e)
5✔
1257
                raise
5✔
1258
        else:
1259
            while True:
5✔
1260
                logger.trace("Attempting to retrieve a normalized patch for resource %s: %s",
5✔
1261
                             resource, normalized_manifest)
1262
                try:
5✔
1263
                    merged_resource = resource.patch(normalized_manifest,
5✔
1264
                                                     patch_type=K8SResourcePatchType.SERVER_SIDE_PATCH,
1265
                                                     dry_run=True,
1266
                                                     force=True)
1267
                except ApiException as e:
5✔
1268
                    try:
5✔
1269
                        if e.status == 422:
5!
1270
                            status = e.body
5✔
1271
                            # Assumes the body has been unmarshalled
1272
                            details = status["details"]
5✔
1273
                            immutable_key = details.get("group"), details["kind"]
5✔
1274

1275
                            try:
5✔
1276
                                propagation_policy = self.context.k8s.immutable_changes[immutable_key]
5✔
1277
                            except KeyError:
×
1278
                                raise e from None
×
1279
                            else:
1280
                                for cause in details["causes"]:
5!
1281
                                    if (
5!
1282
                                            cause["reason"] == "FieldValueInvalid" and
1283
                                            "field is immutable" in cause["message"]
1284
                                            or
1285
                                            cause["reason"] == "FieldValueForbidden" and
1286
                                            ("Forbidden: updates to" in cause["message"]
1287
                                             or
1288
                                             "Forbidden: pod updates" in cause["message"])
1289
                                    ):
1290
                                        logger.info("Deleting resource %s (cascade %s)%s", resource,
5✔
1291
                                                    propagation_policy.policy,
1292
                                                    status_msg)
1293
                                        delete_func(propagation_policy=propagation_policy)
5✔
1294
                                        return 1, 0, 1, create(exists_ok=dry_run)
5✔
1295
                                raise
×
1296
                        else:
1297
                            if not handle_400_strict_validation_error(e):
×
1298
                                raise
×
1299
                    except ApiException as _e:
×
1300
                        api_exc_format_body(_e)
×
1301
                        raise
×
1302

1303
                else:
1304
                    logger.trace("Merged resource %s: %s", resource, merged_resource)
5✔
1305
                    if merge_instrs:
5✔
1306
                        apply_merge_instructions(merge_instrs, normalized_manifest, merged_resource, logger, resource)
5✔
1307

1308
                    patch = jsonpatch.make_patch(remote_resource, merged_resource)
5✔
1309

1310
                    resource_version = merged_resource["metadata"]["resourceVersion"]
5✔
1311
                    resource_uid = merged_resource["metadata"]["uid"]
5✔
1312
                    logger.trace("Resource %s adding resourceVersion %s and UID %s tests", resource, resource_version,
5✔
1313
                                 resource_uid)
1314
                    patch.patch.append({"op": "test", "path": "/metadata/uid", "value": resource_uid})
5✔
1315
                    patch.patch.append({"op": "test", "path": "/metadata/resourceVersion", "value": resource_version})
5✔
1316

1317
                    logger.trace("Resource %s initial patches are: %s", resource, patch)
5✔
1318
                    patch = self._filter_resource_patch(patch, patch_field_excludes)
5✔
1319
                    logger.trace("Resource %s final patches are: %s", resource, patch)
5✔
1320
                    if patch:
5!
1321
                        logger.info("Patching resource %s%s", resource, status_msg)
5✔
1322
                        try:
5✔
1323
                            return 0, 1, 0, patch_func(patch)
5✔
1324
                        except ApiException as e:
5✔
1325
                            if e.status == 409:
5✔
1326
                                logger.warning("Patching resource %s%s encountered a conflict - will retry: \n%s",
5✔
1327
                                               resource, status_msg, yaml.dump(e.body))
1328
                                continue
5✔
1329
                            raise
5✔
1330
                    else:
1331
                        logger.info("Nothing to patch for resource %s", resource)
×
1332
                        return 0, 0, 0, None
×
1333

1334
    def _filter_resource_patch(self, patch: Iterable[Mapping], excludes: Iterable[re.compile]):
5✔
1335
        result = []
5✔
1336
        for op in patch:
5✔
1337
            if op["op"] != "test":
5✔
1338
                path = op["path"]
5✔
1339
                excluded = False
5✔
1340
                for exclude in excludes:
5✔
1341
                    if exclude.match(path):
5✔
1342
                        logger.trace("Excluding %r from patch %s", op, patch)
5✔
1343
                        excluded = True
5✔
1344
                        break
5✔
1345
                if excluded:
5✔
1346
                    continue
5✔
1347
            result.append(op)
5✔
1348
        return result
5✔
1349

1350
    def _setup_k8s_client(self):
5✔
1351
        from kubernetes import client
5✔
1352
        from kubernetes.config import load_incluster_config, load_kube_config, ConfigException
5✔
1353

1354
        try:
5✔
1355
            logger.debug("Trying K8S in-cluster configuration")
5✔
1356
            load_incluster_config()
5✔
1357
            logger.info("Running K8S with in-cluster configuration")
×
1358
        except ConfigException as e:
5✔
1359
            logger.trace("K8S in-cluster configuration failed", exc_info=e)
5✔
1360
            logger.debug("Initializing K8S with kubeconfig configuration")
5✔
1361
            load_kube_config(config_file=self.context.kubeconfig.kubeconfig)
5✔
1362

1363
        k8s_client = client.ApiClient()
5✔
1364

1365
        # Patch the header content type selector to allow json patch
1366
        k8s_client._select_header_content_type = k8s_client.select_header_content_type
5✔
1367
        k8s_client.select_header_content_type = self._select_header_content_type_patch
5✔
1368

1369
        return k8s_client
5✔
1370

1371
    def _select_header_content_type_patch(self, content_types):
5✔
1372
        """Returns `Content-Type` based on an array of content_types provided.
1373
        :param content_types: List of content-types.
1374
        :return: Content-Type (e.g. application/json).
1375
        """
1376

1377
        content_type = self.context.k8s.client._select_header_content_type(content_types)
×
1378
        if content_type == "application/merge-patch+json":
×
1379
            return "application/json-patch+json"
×
1380
        return content_type
×
1381

1382
    def __repr__(self):
5✔
1383
        return "Kubernetes Plugin"
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc