• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NVIDIA / skyhook / 21848362404

10 Feb 2026 01:46AM UTC coverage: 80.713% (-0.1%) from 80.83%
21848362404

push

github

lockwobr
fix: resolve webhook caBundle deadlock during helm upgrade

During helm upgrade, the webhook configurations' caBundle field was
reset to empty, causing new pods to fail readiness checks while the
old leader pod never detected the change (only watched the cert
Secret, with a 24h requeue). This created a deadlock where no pod
could fix the caBundle.

- Watch ValidatingWebhookConfiguration and MutatingWebhookConfiguration
  so the leader detects caBundle changes immediately
- Use bytes.Equal for caBundle comparison instead of len==0 so stale
  values are corrected, not just empty ones
- Remove caBundle from Helm webhook templates so upgrades stop
  resetting operator-managed values

4 of 15 new or added lines in 1 file covered. (26.67%)

4 existing lines in 1 file now uncovered.

6817 of 8446 relevant lines covered (80.71%)

4.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.83
/operator/internal/controller/skyhook_controller.go
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
 * SPDX-License-Identifier: Apache-2.0
4
 *
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
package controller
20

21
import (
22
        "cmp"
23
        "context"
24
        "crypto/sha256"
25
        "encoding/hex"
26
        "encoding/json"
27
        "errors"
28
        "fmt"
29
        "reflect"
30
        "slices"
31
        "sort"
32
        "strings"
33
        "time"
34

35
        "github.com/NVIDIA/skyhook/operator/api/v1alpha1"
36
        "github.com/NVIDIA/skyhook/operator/internal/dal"
37
        "github.com/NVIDIA/skyhook/operator/internal/version"
38
        "github.com/NVIDIA/skyhook/operator/internal/wrapper"
39
        "github.com/go-logr/logr"
40

41
        corev1 "k8s.io/api/core/v1"
42
        policyv1 "k8s.io/api/policy/v1"
43
        apierrors "k8s.io/apimachinery/pkg/api/errors"
44
        "k8s.io/apimachinery/pkg/api/resource"
45
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46
        "k8s.io/apimachinery/pkg/runtime"
47
        "k8s.io/apimachinery/pkg/types"
48
        utilerrors "k8s.io/apimachinery/pkg/util/errors"
49
        "k8s.io/client-go/tools/record"
50
        "k8s.io/kubernetes/pkg/util/taints"
51
        ctrl "sigs.k8s.io/controller-runtime"
52
        "sigs.k8s.io/controller-runtime/pkg/client"
53
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/log"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
)
58

59
const (
60
        EventsReasonSkyhookApply       = "Apply"
61
        EventsReasonSkyhookInterrupt   = "Interrupt"
62
        EventsReasonSkyhookDrain       = "Drain"
63
        EventsReasonSkyhookStateChange = "State"
64
        EventsReasonNodeReboot         = "Reboot"
65
        EventTypeNormal                = "Normal"
66
        // EventTypeWarning = "Warning"
67
        TaintUnschedulable     = corev1.TaintNodeUnschedulable
68
        InterruptContainerName = "interrupt"
69

70
        SkyhookFinalizer = "skyhook.nvidia.com/skyhook"
71
)
72

73
type SkyhookOperatorOptions struct {
74
        Namespace            string        `env:"NAMESPACE, default=skyhook"`
75
        MaxInterval          time.Duration `env:"DEFAULT_INTERVAL, default=10m"`
76
        ImagePullSecret      string        `env:"IMAGE_PULL_SECRET"`
77
        CopyDirRoot          string        `env:"COPY_DIR_ROOT, default=/var/lib/skyhook"`
78
        ReapplyOnReboot      bool          `env:"REAPPLY_ON_REBOOT, default=false"`
79
        RuntimeRequiredTaint string        `env:"RUNTIME_REQUIRED_TAINT, default=skyhook.nvidia.com=runtime-required:NoSchedule"`
80
        PauseImage           string        `env:"PAUSE_IMAGE, default=registry.k8s.io/pause:3.10"`
81
        AgentImage           string        `env:"AGENT_IMAGE, default=ghcr.io/nvidia/skyhook/agent:latest"` // TODO: this needs to be updated with a working default
82
        AgentLogRoot         string        `env:"AGENT_LOG_ROOT, default=/var/log/skyhook"`
83
}
84

85
func (o *SkyhookOperatorOptions) Validate() error {
8✔
86

8✔
87
        messages := make([]string, 0)
8✔
88
        if o.Namespace == "" {
8✔
89
                messages = append(messages, "namespace must be set")
×
90
        }
×
91
        if o.CopyDirRoot == "" {
8✔
92
                messages = append(messages, "copy dir root must be set")
×
93
        }
×
94
        if o.RuntimeRequiredTaint == "" {
8✔
95
                messages = append(messages, "runtime required taint must be set")
×
96
        }
×
97
        if o.MaxInterval < time.Minute {
9✔
98
                messages = append(messages, "max interval must be at least 1 minute")
1✔
99
        }
1✔
100

101
        // CopyDirRoot must start with /
102
        if !strings.HasPrefix(o.CopyDirRoot, "/") {
9✔
103
                messages = append(messages, "copy dir root must start with /")
1✔
104
        }
1✔
105

106
        // RuntimeRequiredTaint must be parsable and must not be a deletion
107
        _, delete, err := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
8✔
108
        if err != nil {
9✔
109
                messages = append(messages, fmt.Sprintf("runtime required taint is invalid: %s", err.Error()))
1✔
110
        }
1✔
111
        if len(delete) > 0 {
9✔
112
                messages = append(messages, "runtime required taint must not be a deletion")
1✔
113
        }
1✔
114

115
        if o.AgentImage == "" {
9✔
116
                messages = append(messages, "agent image must be set")
1✔
117
        }
1✔
118

119
        if !strings.Contains(o.AgentImage, ":") {
9✔
120
                messages = append(messages, "agent image must contain a tag")
1✔
121
        }
1✔
122

123
        if o.PauseImage == "" {
9✔
124
                messages = append(messages, "pause image must be set")
1✔
125
        }
1✔
126

127
        if !strings.Contains(o.PauseImage, ":") {
9✔
128
                messages = append(messages, "pause image must contain a tag")
1✔
129
        }
1✔
130

131
        if len(messages) > 0 {
9✔
132
                return errors.New(strings.Join(messages, ", "))
1✔
133
        }
1✔
134

135
        return nil
8✔
136
}
137

138
// AgentVersion returns the image tag portion of AgentImage
139
func (o *SkyhookOperatorOptions) AgentVersion() string {
8✔
140
        parts := strings.Split(o.AgentImage, ":")
8✔
141
        return parts[len(parts)-1]
8✔
142
}
8✔
143

144
func (o *SkyhookOperatorOptions) GetRuntimeRequiredTaint() corev1.Taint {
8✔
145
        to_add, _, _ := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
8✔
146
        return to_add[0]
8✔
147
}
8✔
148

149
func (o *SkyhookOperatorOptions) GetRuntimeRequiredToleration() corev1.Toleration {
8✔
150
        taint := o.GetRuntimeRequiredTaint()
8✔
151
        return corev1.Toleration{
8✔
152
                Key:      taint.Key,
8✔
153
                Operator: corev1.TolerationOpEqual,
8✔
154
                Value:    taint.Value,
8✔
155
                Effect:   taint.Effect,
8✔
156
        }
8✔
157
}
8✔
158

159
// force type checking against this interface
160
var _ reconcile.Reconciler = &SkyhookReconciler{}
161

162
func NewSkyhookReconciler(schema *runtime.Scheme, c client.Client, recorder record.EventRecorder, opts SkyhookOperatorOptions) (*SkyhookReconciler, error) {
8✔
163

8✔
164
        err := opts.Validate()
8✔
165
        if err != nil {
8✔
166
                return nil, fmt.Errorf("invalid skyhook operator options: %w", err)
×
167
        }
×
168

169
        return &SkyhookReconciler{
8✔
170
                Client:   c,
8✔
171
                scheme:   schema,
8✔
172
                recorder: recorder,
8✔
173
                opts:     opts,
8✔
174
                dal:      dal.New(c),
8✔
175
        }, nil
8✔
176
}
177

178
// SkyhookReconciler reconciles a Skyhook object
179
type SkyhookReconciler struct {
180
        client.Client
181
        scheme   *runtime.Scheme
182
        recorder record.EventRecorder
183
        opts     SkyhookOperatorOptions
184
        dal      dal.DAL
185
}
186

187
// SetupWithManager sets up the controller with the Manager.
188
func (r *SkyhookReconciler) SetupWithManager(mgr ctrl.Manager) error {
8✔
189

8✔
190
        // indexes allow for query on fields to use the local cache
8✔
191
        indexer := mgr.GetFieldIndexer()
8✔
192
        err := indexer.
8✔
193
                IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
15✔
194
                        pod, ok := o.(*corev1.Pod)
7✔
195
                        if !ok {
7✔
196
                                return nil
×
197
                        }
×
198
                        return []string{pod.Spec.NodeName}
7✔
199
                })
200

201
        if err != nil {
8✔
202
                return err
×
203
        }
×
204

205
        ehandler := &eventHandler{
8✔
206
                logger: mgr.GetLogger(),
8✔
207
                dal:    dal.New(r.Client),
8✔
208
        }
8✔
209

8✔
210
        return ctrl.NewControllerManagedBy(mgr).
8✔
211
                For(&v1alpha1.Skyhook{}).
8✔
212
                Watches(
8✔
213
                        &corev1.Pod{},
8✔
214
                        handler.EnqueueRequestsFromMapFunc(podHandlerFunc),
8✔
215
                ).
8✔
216
                Watches(
8✔
217
                        &corev1.Node{},
8✔
218
                        ehandler,
8✔
219
                ).
8✔
220
                Complete(r)
8✔
221
}
222

223
// CRD Permissions
224
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks,verbs=get;list;watch;create;update;patch;delete
225
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/status,verbs=get;update;patch
226
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/finalizers,verbs=update
227
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=deploymentpolicies,verbs=get;list;watch
228

229
// core permissions
230
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;update;patch;watch
231
//+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
232
//+kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create
233
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
234
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
235

236
// Reconcile is part of the main kubernetes reconciliation loop which aims to
237
// move the current state of the cluster closer to the desired state.
238
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile
239
func (r *SkyhookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
8✔
240
        logger := log.FromContext(ctx)
8✔
241
        // split off requests for pods
8✔
242
        if strings.HasPrefix(req.Name, "pod---") {
15✔
243
                name := strings.Split(req.Name, "pod---")[1]
7✔
244
                pod, err := r.dal.GetPod(ctx, req.Namespace, name)
7✔
245
                if err == nil && pod != nil { // if pod, then call other wise not a pod
14✔
246
                        return r.PodReconcile(ctx, pod)
7✔
247
                }
7✔
248
                return ctrl.Result{}, err
7✔
249
        }
250

251
        // get all skyhooks (SCR)
252
        skyhooks, err := r.dal.GetSkyhooks(ctx)
8✔
253
        if err != nil {
8✔
254
                // error, going to requeue and backoff
×
255
                logger.Error(err, "error getting skyhooks")
×
256
                return ctrl.Result{}, err
×
257
        }
×
258

259
        // if there are no skyhooks, so actually nothing to do, so don't requeue
260
        if skyhooks == nil || len(skyhooks.Items) == 0 {
16✔
261
                return ctrl.Result{}, nil
8✔
262
        }
8✔
263

264
        // get all nodes
265
        nodes, err := r.dal.GetNodes(ctx)
7✔
266
        if err != nil {
7✔
267
                // error, going to requeue and backoff
×
268
                logger.Error(err, "error getting nodes")
×
269
                return ctrl.Result{}, err
×
270
        }
×
271

272
        // if no nodes, well not work to do either
273
        if nodes == nil || len(nodes.Items) == 0 {
7✔
274
                // no nodes, so nothing to do
×
275
                return ctrl.Result{}, nil
×
276
        }
×
277

278
        // get all deployment policies
279
        deploymentPolicies, err := r.dal.GetDeploymentPolicies(ctx)
7✔
280
        if err != nil {
7✔
281
                logger.Error(err, "error getting deployment policies")
×
282
                return ctrl.Result{}, err
×
283
        }
×
284

285
        // TODO: this build state could error in a lot of ways, and I think we might want to move towards partial state
286
        // mean if we cant get on SCR state, great, process that one and error
287

288
        // BUILD cluster state from all skyhooks, and all nodes
289
        // this filters and pairs up nodes to skyhooks, also provides help methods for introspection and mutation
290
        clusterState, err := BuildState(skyhooks, nodes, deploymentPolicies)
7✔
291
        if err != nil {
7✔
292
                // error, going to requeue and backoff
×
293
                logger.Error(err, "error building cluster state")
×
294
                return ctrl.Result{}, err
×
295
        }
×
296

297
        if yes, result, err := shouldReturn(r.HandleMigrations(ctx, clusterState)); yes {
14✔
298
                return result, err
7✔
299
        }
7✔
300

301
        if yes, result, err := shouldReturn(r.TrackReboots(ctx, clusterState)); yes {
14✔
302
                return result, err
7✔
303
        }
7✔
304

305
        // node picker is for selecting nodes to do work, tries maintain a prior of nodes between SCRs
306
        nodePicker := NewNodePicker(logger, r.opts.GetRuntimeRequiredToleration())
7✔
307

7✔
308
        errs := make([]error, 0)
7✔
309
        var result *ctrl.Result
7✔
310

7✔
311
        for _, skyhook := range clusterState.skyhooks {
14✔
312
                if yes, result, err := shouldReturn(r.HandleFinalizer(ctx, skyhook)); yes {
14✔
313
                        return result, err
7✔
314
                }
7✔
315

316
                if yes, result, err := shouldReturn(r.ReportState(ctx, clusterState, skyhook)); yes {
14✔
317
                        return result, err
7✔
318
                }
7✔
319

320
                if skyhook.IsPaused() {
13✔
321
                        if yes, result, err := shouldReturn(r.UpdatePauseStatus(ctx, clusterState, skyhook)); yes {
12✔
322
                                return result, err
6✔
323
                        }
6✔
324
                        continue
6✔
325
                }
326

327
                if yes, result, err := r.validateAndUpsertSkyhookData(ctx, skyhook, clusterState); yes {
13✔
328
                        return result, err
6✔
329
                }
6✔
330

331
                changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
7✔
332
                if changed {
14✔
333
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
334
                        if len(errs) > 0 {
12✔
335
                                return ctrl.Result{RequeueAfter: time.Second * 2}, utilerrors.NewAggregate(errs)
5✔
336
                        }
5✔
337
                        return ctrl.Result{RequeueAfter: time.Second * 2}, nil
7✔
338
                }
339

340
                _, err := HandleVersionChange(skyhook)
7✔
341
                if err != nil {
7✔
342
                        return ctrl.Result{RequeueAfter: time.Second * 2}, fmt.Errorf("error getting packages to uninstall: %w", err)
×
343
                }
×
344
        }
345

346
        // Process all non-complete, non-disabled skyhooks (in priority order)
347
        // Each skyhook is processed only for nodes that are ready (all higher-priority skyhooks complete on that node)
348
        // This enables per-node priority ordering: nodes can progress independently
349
        result, err = r.processSkyhooksPerNode(ctx, clusterState, nodePicker, logger)
7✔
350
        if err != nil {
12✔
351
                errs = append(errs, err)
5✔
352
        }
5✔
353

354
        err = r.HandleRuntimeRequired(ctx, clusterState)
7✔
355
        if err != nil {
7✔
356
                errs = append(errs, err)
×
357
        }
×
358

359
        if len(errs) > 0 {
12✔
360
                err := utilerrors.NewAggregate(errs)
5✔
361
                return ctrl.Result{}, err
5✔
362
        }
5✔
363

364
        if result != nil {
14✔
365
                return *result, nil
7✔
366
        }
7✔
367

368
        // default happy retry after max
369
        return ctrl.Result{RequeueAfter: r.opts.MaxInterval}, nil
7✔
370
}
371

372
// processSkyhooksPerNode processes all skyhooks for nodes that are ready (per-node priority ordering).
373
// A node is ready for a skyhook if all higher-priority skyhooks are complete on that specific node.
374
func (r *SkyhookReconciler) processSkyhooksPerNode(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, logger logr.Logger) (*ctrl.Result, error) {
7✔
375
        var result *ctrl.Result
7✔
376
        var errs []error
7✔
377

7✔
378
        for _, skyhook := range clusterState.skyhooks {
14✔
379
                if skyhook.IsComplete() || skyhook.IsDisabled() || skyhook.IsPaused() {
14✔
380
                        continue
7✔
381
                }
382

383
                // Check if any nodes are ready for this skyhook
384
                if !hasReadyNodesForSkyhook(skyhook, clusterState.skyhooks) {
12✔
385
                        continue
5✔
386
                }
387

388
                res, err := r.RunSkyhookPackages(ctx, clusterState, nodePicker, skyhook)
7✔
389
                if err != nil {
12✔
390
                        logger.Error(err, "error processing skyhook", "skyhook", skyhook.GetSkyhook().Name)
5✔
391
                        errs = append(errs, err)
5✔
392
                }
5✔
393
                if res != nil {
14✔
394
                        result = res
7✔
395
                }
7✔
396
        }
397

398
        if len(errs) > 0 {
12✔
399
                return result, utilerrors.NewAggregate(errs)
5✔
400
        }
5✔
401
        return result, nil
7✔
402
}
403

404
// hasReadyNodesForSkyhook checks if any nodes are ready to process this skyhook.
405
// A node is ready if it's not complete and all higher-priority skyhooks are complete on that node.
406
func hasReadyNodesForSkyhook(skyhook SkyhookNodes, allSkyhooks []SkyhookNodes) bool {
7✔
407
        for _, node := range skyhook.GetNodes() {
14✔
408
                if !node.IsComplete() && IsNodeReadyForSkyhook(node.GetNode().Name, skyhook, allSkyhooks) {
14✔
409
                        return true
7✔
410
                }
7✔
411
        }
412
        return false
5✔
413
}
414

415
func shouldReturn(updates bool, err error) (bool, ctrl.Result, error) {
7✔
416
        if err != nil {
14✔
417
                return true, ctrl.Result{}, err
7✔
418
        }
7✔
419
        if updates {
14✔
420
                return true, ctrl.Result{RequeueAfter: time.Second * 2}, nil
7✔
421
        }
7✔
422
        return false, ctrl.Result{}, nil
7✔
423
}
424

425
func (r *SkyhookReconciler) HandleMigrations(ctx context.Context, clusterState *clusterState) (bool, error) {
7✔
426

7✔
427
        updates := false
7✔
428

7✔
429
        if version.VERSION == "" {
7✔
430
                // this means the binary was complied without version information
×
431
                return false, nil
×
432
        }
×
433

434
        logger := log.FromContext(ctx)
7✔
435
        errors := make([]error, 0)
7✔
436
        for _, skyhook := range clusterState.skyhooks {
14✔
437

7✔
438
                err := skyhook.Migrate(logger)
7✔
439
                if err != nil {
7✔
440
                        return false, fmt.Errorf("error migrating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
441
                }
×
442

443
                if err := skyhook.GetSkyhook().Skyhook.Validate(); err != nil {
7✔
444
                        return false, fmt.Errorf("error validating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
445
                }
×
446

447
                for _, node := range skyhook.GetNodes() {
14✔
448
                        if node.Changed() {
14✔
449
                                err := r.Status().Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
7✔
450
                                if err != nil {
7✔
451
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
452
                                }
×
453

454
                                err = r.Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
7✔
455
                                if err != nil {
7✔
456
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
457
                                }
×
458
                                updates = true
7✔
459
                        }
460
                }
461

462
                if skyhook.GetSkyhook().Updated {
14✔
463
                        // need to do this because SaveNodesAndSkyhook only saves skyhook status, not the main skyhook object where the annotations are
7✔
464
                        // additionally it needs to be an update, a patch nils out the annotations for some reason, which the save function does a patch
7✔
465

7✔
466
                        if err = r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
14✔
467
                                return false, fmt.Errorf("error updating during migration skyhook status [%s]: %w", skyhook.GetSkyhook().Name, err)
7✔
468
                        }
7✔
469

470
                        // because of conflict issues (409) we need to do things a bit differently here.
471
                        // We might be able to use server side apply in the future, but for now we need to do this
472
                        // https://kubernetes.io/docs/reference/using-api/server-side-apply/
473
                        // https://github.com/kubernetes-sigs/controller-runtime/issues/347
474

475
                        // work around for now is to grab a new copy of the object, and then patch it
476

477
                        newskyhook, err := r.dal.GetSkyhook(ctx, skyhook.GetSkyhook().Name)
7✔
478
                        if err != nil {
7✔
479
                                return false, fmt.Errorf("error getting skyhook to migrate [%s]: %w", skyhook.GetSkyhook().Name, err)
×
480
                        }
×
481
                        newPatch := client.MergeFrom(newskyhook.DeepCopy())
7✔
482

7✔
483
                        // set version
7✔
484
                        wrapper.NewSkyhookWrapper(newskyhook).SetVersion()
7✔
485

7✔
486
                        if err = r.Patch(ctx, newskyhook, newPatch); err != nil {
7✔
487
                                return false, fmt.Errorf("error updating during migration skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
488
                        }
×
489

490
                        updates = true
7✔
491
                }
492
        }
493

494
        if len(errors) > 0 {
7✔
495
                return false, utilerrors.NewAggregate(errors)
×
496
        }
×
497

498
        return updates, nil
7✔
499
}
500

501
// ReportState computes and puts important information into the skyhook status so that monitoring tools such as k9s
502
// can see the information at a glance. For example, the number of completed nodes and the list of packages in the skyhook.
503
func (r *SkyhookReconciler) ReportState(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
7✔
504

7✔
505
        // save updated state to skyhook status
7✔
506
        skyhook.ReportState()
7✔
507

7✔
508
        if skyhook.GetSkyhook().Updated {
14✔
509
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
510
                if len(errs) > 0 {
7✔
511
                        return false, utilerrors.NewAggregate(errs)
×
512
                }
×
513
                return true, nil
7✔
514
        }
515

516
        return false, nil
7✔
517
}
518

519
func (r *SkyhookReconciler) UpdatePauseStatus(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
6✔
520
        changed := UpdateSkyhookPauseStatus(skyhook)
6✔
521

6✔
522
        if changed {
12✔
523
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
6✔
524
                if len(errs) > 0 {
6✔
525
                        return false, utilerrors.NewAggregate(errs)
×
526
                }
×
527
                return true, nil
6✔
528
        }
529

530
        return false, nil
6✔
531
}
532

533
func (r *SkyhookReconciler) TrackReboots(ctx context.Context, clusterState *clusterState) (bool, error) {
7✔
534

7✔
535
        updates := false
7✔
536
        errs := make([]error, 0)
7✔
537

7✔
538
        for _, skyhook := range clusterState.skyhooks {
14✔
539
                if skyhook.GetSkyhook().Status.NodeBootIds == nil {
14✔
540
                        skyhook.GetSkyhook().Status.NodeBootIds = make(map[string]string)
7✔
541
                }
7✔
542

543
                for _, node := range skyhook.GetNodes() {
14✔
544
                        id, ok := skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name]
7✔
545

7✔
546
                        if !ok { // new node
14✔
547
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
7✔
548
                                skyhook.GetSkyhook().Updated = true
7✔
549
                        }
7✔
550

551
                        if id != "" && id != node.GetNode().Status.NodeInfo.BootID { // node rebooted
7✔
552
                                if r.opts.ReapplyOnReboot {
×
553
                                        r.recorder.Eventf(skyhook.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node [%s] to be reapplied", node.GetNode().Name)
×
554
                                        r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node for [%s] to be reapplied", node.GetSkyhook().Name)
×
555
                                        node.Reset()
×
556
                                }
×
557
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
×
558
                                skyhook.GetSkyhook().Updated = true
×
559
                        }
560

561
                        if node.Changed() { // update
7✔
562
                                updates = true
×
563
                                err := r.Update(ctx, node.GetNode())
×
564
                                if err != nil {
×
565
                                        errs = append(errs, fmt.Errorf("error updating node after reboot [%s]: %w", node.GetNode().Name, err))
×
566
                                }
×
567
                        }
568
                }
569
                if skyhook.GetSkyhook().Updated { // update
14✔
570
                        updates = true
7✔
571
                        err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook)
7✔
572
                        if err != nil {
14✔
573
                                errs = append(errs, fmt.Errorf("error updating skyhook status after reboot [%s]: %w", skyhook.GetSkyhook().Name, err))
7✔
574
                        }
7✔
575
                }
576
        }
577

578
        return updates, utilerrors.NewAggregate(errs)
7✔
579
}
580

581
// RunSkyhookPackages runs all skyhook packages then saves and requeues if changes were made
582
func (r *SkyhookReconciler) RunSkyhookPackages(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, skyhook SkyhookNodes) (*ctrl.Result, error) {
7✔
583

7✔
584
        logger := log.FromContext(ctx)
7✔
585
        requeue := false
7✔
586

7✔
587
        toUninstall, err := HandleVersionChange(skyhook)
7✔
588
        if err != nil {
7✔
589
                return nil, fmt.Errorf("error getting packages to uninstall: %w", err)
×
590
        }
×
591

592
        changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
7✔
593
        if !changed && skyhook.IsComplete() {
7✔
594
                return nil, nil
×
595
        }
×
596

597
        selectedNode := nodePicker.SelectNodes(skyhook)
7✔
598

7✔
599
        for _, node := range selectedNode {
14✔
600
                // Skip nodes that are waiting on higher-priority skyhooks
7✔
601
                // This enables per-node priority ordering
7✔
602
                if !IsNodeReadyForSkyhook(node.GetNode().Name, skyhook, clusterState.skyhooks) {
12✔
603
                        continue
5✔
604
                }
605

606
                if node.IsComplete() && !node.Changed() {
7✔
607
                        continue
×
608
                }
609

610
                toRun, err := node.RunNext()
7✔
611
                if err != nil {
7✔
612
                        return nil, fmt.Errorf("error getting next packages to run: %w", err)
×
613
                }
×
614

615
                // prepend the uninstall packages so they are ran first
616
                toRun = append(toUninstall, toRun...)
7✔
617

7✔
618
                interrupt, pack := fudgeInterruptWithPriority(toRun, skyhook.GetSkyhook().GetConfigUpdates(), skyhook.GetSkyhook().GetConfigInterrupts())
7✔
619

7✔
620
                for _, f := range toRun {
14✔
621

7✔
622
                        ok, err := r.ProcessInterrupt(ctx, node, f, interrupt, interrupt != nil && f.Name == pack)
7✔
623
                        if err != nil {
7✔
624
                                // TODO: error handle
×
625
                                return nil, fmt.Errorf("error processing if we should interrupt [%s:%s]: %w", f.Name, f.Version, err)
×
626
                        }
×
627
                        if !ok {
12✔
628
                                requeue = true
5✔
629
                                continue
5✔
630
                        }
631

632
                        err = r.ApplyPackage(ctx, logger, clusterState, node, f, interrupt != nil && f.Name == pack)
7✔
633
                        if err != nil {
7✔
634
                                return nil, fmt.Errorf("error applying package [%s:%s]: %w", f.Name, f.Version, err)
×
635
                        }
×
636

637
                        // process one package at a time
638
                        if skyhook.GetSkyhook().Spec.Serial {
7✔
639
                                return &ctrl.Result{Requeue: true}, nil
×
640
                        }
×
641
                }
642
        }
643

644
        saved, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
645
        if len(errs) > 0 {
12✔
646
                return &ctrl.Result{}, utilerrors.NewAggregate(errs)
5✔
647
        }
5✔
648
        if saved {
14✔
649
                requeue = true
7✔
650
        }
7✔
651

652
        if !skyhook.IsComplete() || requeue {
14✔
653
                return &ctrl.Result{RequeueAfter: time.Second * 2}, nil // not sure this is better then just requeue bool
7✔
654
        }
7✔
655

656
        return nil, utilerrors.NewAggregate(errs)
×
657
}
658

659
// SaveNodesAndSkyhook saves nodes and skyhook and will update the events if the skyhook status changes
660
func (r *SkyhookReconciler) SaveNodesAndSkyhook(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, []error) {
7✔
661
        saved := false
7✔
662
        errs := make([]error, 0)
7✔
663

7✔
664
        for _, node := range skyhook.GetNodes() {
14✔
665
                patch := client.StrategicMergeFrom(clusterState.tracker.GetOriginal(node.GetNode()))
7✔
666
                if node.Changed() {
14✔
667
                        err := r.Patch(ctx, node.GetNode(), patch)
7✔
668
                        if err != nil {
7✔
669
                                errs = append(errs, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
670
                        }
×
671
                        saved = true
7✔
672

7✔
673
                        err = r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhook.GetSkyhook(), node.GetNode())
7✔
674
                        if err != nil {
7✔
675
                                errs = append(errs, fmt.Errorf("error upserting labels, annotations, and packages config map for node [%s]: %w", node.GetNode().Name, err))
×
676
                        }
×
677

678
                        if node.IsComplete() {
14✔
679
                                r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook [%s] complete.", skyhook.GetSkyhook().Name)
7✔
680

7✔
681
                                // since node is complete remove from priority
7✔
682
                                if _, ok := skyhook.GetSkyhook().Status.NodePriority[node.GetNode().Name]; ok {
14✔
683
                                        delete(skyhook.GetSkyhook().Status.NodePriority, node.GetNode().Name)
7✔
684
                                        skyhook.GetSkyhook().Updated = true
7✔
685
                                }
7✔
686
                        }
687
                }
688

689
                // updates node's condition
690
                node.UpdateCondition()
7✔
691
                if node.Changed() {
14✔
692
                        // conditions are in status
7✔
693
                        err := r.Status().Patch(ctx, node.GetNode(), patch)
7✔
694
                        if err != nil {
12✔
695
                                errs = append(errs, fmt.Errorf("error patching node status [%s]: %w", node.GetNode().Name, err))
5✔
696
                        }
5✔
697
                        saved = true
7✔
698
                }
699

700
                if node.GetSkyhook() != nil && node.GetSkyhook().Updated {
14✔
701
                        skyhook.GetSkyhook().Updated = true
7✔
702
                }
7✔
703
        }
704

705
        if skyhook.GetSkyhook().Updated {
14✔
706
                patch := client.MergeFrom(clusterState.tracker.GetOriginal(skyhook.GetSkyhook().Skyhook))
7✔
707
                err := r.Status().Patch(ctx, skyhook.GetSkyhook().Skyhook, patch)
7✔
708
                if err != nil {
7✔
709
                        errs = append(errs, err)
×
710
                }
×
711
                saved = true
7✔
712

7✔
713
                if skyhook.GetPriorStatus() != "" && skyhook.GetPriorStatus() != skyhook.Status() {
14✔
714
                        // we transitioned, fire event
7✔
715
                        r.recorder.Eventf(skyhook.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook transitioned [%s] -> [%s]", skyhook.GetPriorStatus(), skyhook.Status())
7✔
716
                }
7✔
717
        }
718

719
        if len(errs) > 0 {
12✔
720
                saved = false
5✔
721
        }
5✔
722
        return saved, errs
7✔
723
}
724

725
// HandleVersionChange updates the state for the node or skyhook if a version is changed on a package
726
func HandleVersionChange(skyhook SkyhookNodes) ([]*v1alpha1.Package, error) {
8✔
727
        toUninstall := make([]*v1alpha1.Package, 0)
8✔
728
        versionChangeDetected := false
8✔
729

8✔
730
        for _, node := range skyhook.GetNodes() {
16✔
731
                nodeState, err := node.State()
8✔
732
                if err != nil {
8✔
733
                        return nil, err
×
734
                }
×
735

736
                for _, packageStatus := range nodeState {
16✔
737
                        upgrade := false
8✔
738

8✔
739
                        _package, exists := skyhook.GetSkyhook().Spec.Packages[packageStatus.Name]
8✔
740
                        if exists && _package.Version == packageStatus.Version {
16✔
741
                                continue // no uninstall needed for package
8✔
742
                        }
743

744
                        packageStatusRef := v1alpha1.PackageRef{
6✔
745
                                Name:    packageStatus.Name,
6✔
746
                                Version: packageStatus.Version,
6✔
747
                        }
6✔
748

6✔
749
                        if !exists && packageStatus.Stage != v1alpha1.StageUninstall {
11✔
750
                                // Start uninstall of old package
5✔
751
                                err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
5✔
752
                                if err != nil {
5✔
753
                                        return nil, fmt.Errorf("error updating node status: %w", err)
×
754
                                }
×
755
                                versionChangeDetected = true
5✔
756
                        } else if exists && _package.Version != packageStatus.Version {
12✔
757
                                versionChangeDetected = true
6✔
758
                                comparison := version.Compare(_package.Version, packageStatus.Version)
6✔
759
                                if comparison == -2 {
6✔
760
                                        return nil, errors.New("error comparing package versions: invalid version string provided enabling webhooks validates versions before being applied")
×
761
                                }
×
762

763
                                if comparison == 1 {
12✔
764
                                        _packageStatus, found := node.PackageStatus(_package.GetUniqueName())
6✔
765
                                        if found && _packageStatus.Stage == v1alpha1.StageUpgrade {
12✔
766
                                                continue
6✔
767
                                        }
768

769
                                        // start upgrade of package
770
                                        err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageUpgrade, 0, _package.ContainerSHA)
5✔
771
                                        if err != nil {
5✔
772
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
773
                                        }
×
774

775
                                        upgrade = true
5✔
776
                                } else if comparison == -1 && packageStatus.Stage != v1alpha1.StageUninstall {
10✔
777
                                        // Start uninstall of old package
5✔
778
                                        err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
5✔
779
                                        if err != nil {
5✔
780
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
781
                                        }
×
782

783
                                        // If version changed then update new version to wait
784
                                        err = node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, v1alpha1.StageUninstall, 0, _package.ContainerSHA)
5✔
785
                                        if err != nil {
5✔
786
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
787
                                        }
×
788
                                }
789
                        }
790

791
                        // only need to create a feaux package for uninstall since it won't be in the DAG (Upgrade will)
792
                        newPackageStatus, found := node.PackageStatus(packageStatusRef.GetUniqueName())
5✔
793
                        if !upgrade && found && newPackageStatus.Stage == v1alpha1.StageUninstall && newPackageStatus.State == v1alpha1.StateInProgress {
10✔
794
                                // create fake package with the info we can salvage from the node state
5✔
795
                                newPackage := &v1alpha1.Package{
5✔
796
                                        PackageRef: packageStatusRef,
5✔
797
                                        Image:      packageStatus.Image,
5✔
798
                                }
5✔
799

5✔
800
                                // Add package to uninstall list if it's not already present
5✔
801
                                found := false
5✔
802
                                for _, uninstallPackage := range toUninstall {
10✔
803
                                        if reflect.DeepEqual(uninstallPackage, newPackage) {
5✔
804
                                                found = true
×
805
                                        }
×
806
                                }
807

808
                                if !found {
10✔
809
                                        toUninstall = append(toUninstall, newPackage)
5✔
810
                                }
5✔
811
                        }
812

813
                        // remove all config updates for the package since it's being uninstalled or
814
                        // upgraded. NOTE: The config updates must be removed whenever the version changes
815
                        // or else the package interrupt may be skipped if there is one
816
                        skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
5✔
817

5✔
818
                        // set the node and skyhook status to in progress
5✔
819
                        node.SetStatus(v1alpha1.StatusInProgress)
5✔
820
                }
821
        }
822

823
        // Auto-reset batch state when version changes are detected (if configured)
824
        if versionChangeDetected {
14✔
825
                resetSkyhookBatchState(skyhook)
6✔
826
        }
6✔
827

828
        return toUninstall, nil
8✔
829
}
830

831
// helper for get a point to a ref
832
func ptr[E any](e E) *E {
8✔
833
        return &e
8✔
834
}
8✔
835

836
// generateSafeName generates a consistent name for Kubernetes resources that is unique
837
// while staying within the specified character limit
838
func generateSafeName(maxLen int, nameParts ...string) string {
8✔
839
        name := strings.Join(nameParts, "-")
8✔
840
        // Replace dots with dashes as they're not allowed in resource names
8✔
841
        name = strings.ReplaceAll(name, ".", "-")
8✔
842

8✔
843
        unique := sha256.Sum256([]byte(name))
8✔
844
        uniqueStr := hex.EncodeToString(unique[:])[:8]
8✔
845

8✔
846
        maxlen := maxLen - len(uniqueStr) - 1
8✔
847
        if len(name) > maxlen {
16✔
848
                name = name[:maxlen]
8✔
849
        }
8✔
850

851
        return strings.ToLower(fmt.Sprintf("%s-%s", name, uniqueStr))
8✔
852
}
853

854
func (r *SkyhookReconciler) UpsertNodeLabelsAnnotationsPackages(ctx context.Context, skyhook *wrapper.Skyhook, node *corev1.Node) error {
8✔
855
        // No work to do if there is no labels or annotations for node
8✔
856
        if len(node.Labels) == 0 && len(node.Annotations) == 0 {
8✔
857
                return nil
×
858
        }
×
859

860
        annotations, err := json.Marshal(node.Annotations)
8✔
861
        if err != nil {
8✔
862
                return fmt.Errorf("error converting annotations into byte array: %w", err)
×
863
        }
×
864

865
        labels, err := json.Marshal(node.Labels)
8✔
866
        if err != nil {
8✔
867
                return fmt.Errorf("error converting labels into byte array: %w", err)
×
868
        }
×
869

870
        // marshal intermediary package metadata for the agent
871
        metadata := NewSkyhookMetadata(r.opts, skyhook)
8✔
872
        packages, err := metadata.Marshal()
8✔
873
        if err != nil {
8✔
874
                return fmt.Errorf("error converting packages into byte array: %w", err)
×
875
        }
×
876

877
        configMapName := generateSafeName(253, skyhook.Name, node.Name, "metadata")
8✔
878
        newCM := &corev1.ConfigMap{
8✔
879
                ObjectMeta: metav1.ObjectMeta{
8✔
880
                        Name:      configMapName,
8✔
881
                        Namespace: r.opts.Namespace,
8✔
882
                        Labels: map[string]string{
8✔
883
                                fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhook.Name,
8✔
884
                        },
8✔
885
                        Annotations: map[string]string{
8✔
886
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
8✔
887
                                fmt.Sprintf("%s/Node.name", v1alpha1.METADATA_PREFIX): node.Name,
8✔
888
                        },
8✔
889
                },
8✔
890
                Data: map[string]string{
8✔
891
                        "annotations.json": string(annotations),
8✔
892
                        "labels.json":      string(labels),
8✔
893
                        "packages.json":    string(packages),
8✔
894
                },
8✔
895
        }
8✔
896

8✔
897
        if err := ctrl.SetControllerReference(skyhook.Skyhook, newCM, r.scheme); err != nil {
8✔
898
                return fmt.Errorf("error setting ownership: %w", err)
×
899
        }
×
900

901
        existingConfigMap := &corev1.ConfigMap{}
8✔
902
        err = r.Get(ctx, client.ObjectKey{Namespace: r.opts.Namespace, Name: configMapName}, existingConfigMap)
8✔
903
        if err != nil {
16✔
904
                if apierrors.IsNotFound(err) {
16✔
905
                        // create
8✔
906
                        err := r.Create(ctx, newCM)
8✔
907
                        if err != nil {
8✔
908
                                return fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
909
                        }
×
910
                } else {
×
911
                        return fmt.Errorf("error getting config map: %w", err)
×
912
                }
×
913
        } else {
7✔
914
                if !reflect.DeepEqual(existingConfigMap.Data, newCM.Data) {
14✔
915
                        // update
7✔
916
                        err := r.Update(ctx, newCM)
7✔
917
                        if err != nil {
7✔
918
                                return fmt.Errorf("error updating config map [%s]: %w", newCM.Name, err)
×
919
                        }
×
920
                }
921
        }
922

923
        return nil
8✔
924
}
925

926
// HandleConfigUpdates checks whether the configMap on a package was updated and if it was the configmap will
927
// be updated and the package will be put into config mode if the package is complete or erroring
928
func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes, _package v1alpha1.Package, oldConfigMap, newConfigMap *corev1.ConfigMap) (bool, error) {
7✔
929
        completedNodes, nodeCount := 0, len(skyhook.GetNodes())
7✔
930
        erroringNode := false
7✔
931

7✔
932
        // if configmap changed
7✔
933
        if !reflect.DeepEqual(oldConfigMap.Data, newConfigMap.Data) {
12✔
934
                for _, node := range skyhook.GetNodes() {
10✔
935
                        exists, err := r.PodExists(ctx, node.GetNode().Name, skyhook.GetSkyhook().Name, &_package)
5✔
936
                        if err != nil {
5✔
937
                                return false, err
×
938
                        }
×
939

940
                        if !exists && node.IsPackageComplete(_package) {
10✔
941
                                completedNodes++
5✔
942
                        }
5✔
943

944
                        // if we have an erroring node in the config, interrupt, or post-interrupt mode
945
                        // then we will restart the config changes
946
                        if packageStatus, found := node.PackageStatus(_package.GetUniqueName()); found {
10✔
947
                                switch packageStatus.Stage {
5✔
948
                                case v1alpha1.StageConfig, v1alpha1.StageInterrupt, v1alpha1.StagePostInterrupt:
5✔
949
                                        if packageStatus.State == v1alpha1.StateErroring {
5✔
950
                                                erroringNode = true
×
951

×
952
                                                // delete the erroring pod from the node so that it can be recreated
×
953
                                                // with the updated configmap
×
954
                                                pods, err := r.dal.GetPods(ctx,
×
955
                                                        client.MatchingFields{
×
956
                                                                "spec.nodeName": node.GetNode().Name,
×
957
                                                        },
×
958
                                                        client.MatchingLabels{
×
959
                                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.GetSkyhook().Name,
×
960
                                                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
×
961
                                                        },
×
962
                                                )
×
963
                                                if err != nil {
×
964
                                                        return false, err
×
965
                                                }
×
966

967
                                                if pods != nil {
×
968
                                                        for _, pod := range pods.Items {
×
969
                                                                err := r.Delete(ctx, &pod)
×
970
                                                                if err != nil {
×
971
                                                                        return false, err
×
972
                                                                }
×
973
                                                        }
974
                                                }
975
                                        }
976
                                }
977
                        }
978
                }
979

980
                // if the update is complete or there is an erroring node put the package back into
981
                // the config mode and update the config map
982
                if completedNodes == nodeCount || erroringNode {
10✔
983
                        // get the keys in the configmap that changed
5✔
984
                        newConfigUpdates := make([]string, 0)
5✔
985
                        for key, new_val := range newConfigMap.Data {
10✔
986
                                if old_val, exists := oldConfigMap.Data[key]; !exists || old_val != new_val {
10✔
987
                                        newConfigUpdates = append(newConfigUpdates, key)
5✔
988
                                }
5✔
989
                        }
990

991
                        // if updates completed then clear out old config updates as they are finished
992
                        if completedNodes == nodeCount {
10✔
993
                                skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
5✔
994
                        }
5✔
995

996
                        // Add the new changed keys to the config updates
997
                        skyhook.GetSkyhook().AddConfigUpdates(_package.Name, newConfigUpdates...)
5✔
998

5✔
999
                        for _, node := range skyhook.GetNodes() {
10✔
1000
                                err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageConfig, 0, _package.ContainerSHA)
5✔
1001
                                if err != nil {
5✔
1002
                                        return false, fmt.Errorf("error upserting node status [%s]: %w", node.GetNode().Name, err)
×
1003
                                }
×
1004

1005
                                node.SetStatus(v1alpha1.StatusInProgress)
5✔
1006
                        }
1007

1008
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
5✔
1009
                        if len(errs) > 0 {
5✔
1010
                                return false, utilerrors.NewAggregate(errs)
×
1011
                        }
×
1012

1013
                        // update config map
1014
                        err := r.Update(ctx, newConfigMap)
5✔
1015
                        if err != nil {
5✔
1016
                                return false, fmt.Errorf("error updating config map [%s]: %w", newConfigMap.Name, err)
×
1017
                        }
×
1018

1019
                        return true, nil
5✔
1020
                }
1021
        }
1022

1023
        return false, nil
7✔
1024
}
1025

1026
func (r *SkyhookReconciler) UpsertConfigmaps(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, error) {
7✔
1027
        updated := false
7✔
1028

7✔
1029
        var list corev1.ConfigMapList
7✔
1030
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name})
7✔
1031
        if err != nil {
7✔
1032
                return false, fmt.Errorf("error listing config maps while upserting: %w", err)
×
1033
        }
×
1034

1035
        existingCMs := make(map[string]corev1.ConfigMap)
7✔
1036
        for _, cm := range list.Items {
14✔
1037
                existingCMs[cm.Name] = cm
7✔
1038
        }
7✔
1039

1040
        // clean up from an update
1041
        shouldExist := make(map[string]struct{})
7✔
1042
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
14✔
1043
                shouldExist[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))] = struct{}{}
7✔
1044
        }
7✔
1045

1046
        for k, v := range existingCMs {
14✔
1047
                if _, ok := shouldExist[k]; !ok {
12✔
1048
                        // delete
5✔
1049
                        err := r.Delete(ctx, &v)
5✔
1050
                        if err != nil {
5✔
1051
                                return false, fmt.Errorf("error deleting existing config map [%s] while upserting: %w", v.Name, err)
×
1052
                        }
×
1053
                }
1054
        }
1055

1056
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
14✔
1057
                if len(_package.ConfigMap) > 0 {
14✔
1058

7✔
1059
                        newCM := &corev1.ConfigMap{
7✔
1060
                                ObjectMeta: metav1.ObjectMeta{
7✔
1061
                                        Name:      strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version)),
7✔
1062
                                        Namespace: r.opts.Namespace,
7✔
1063
                                        Labels: map[string]string{
7✔
1064
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
7✔
1065
                                        },
7✔
1066
                                        Annotations: map[string]string{
7✔
1067
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):            skyhook.GetSkyhook().Name,
7✔
1068
                                                fmt.Sprintf("%s/Package.Name", v1alpha1.METADATA_PREFIX):    _package.Name,
7✔
1069
                                                fmt.Sprintf("%s/Package.Version", v1alpha1.METADATA_PREFIX): _package.Version,
7✔
1070
                                        },
7✔
1071
                                },
7✔
1072
                                Data: _package.ConfigMap,
7✔
1073
                        }
7✔
1074
                        // set owner of CM to the SCR, which will clean up the CM in delete of the SCR
7✔
1075
                        if err := ctrl.SetControllerReference(skyhook.GetSkyhook().Skyhook, newCM, r.scheme); err != nil {
7✔
1076
                                return false, fmt.Errorf("error setting ownership of cm: %w", err)
×
1077
                        }
×
1078

1079
                        if existingCM, ok := existingCMs[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))]; ok {
14✔
1080
                                updatedConfigMap, err := r.HandleConfigUpdates(ctx, clusterState, skyhook, _package, &existingCM, newCM)
7✔
1081
                                if err != nil {
7✔
1082
                                        return false, fmt.Errorf("error updating config map [%s]: %s", newCM.Name, err)
×
1083
                                }
×
1084
                                if updatedConfigMap {
12✔
1085
                                        updated = true
5✔
1086
                                }
5✔
1087
                        } else {
7✔
1088
                                // create
7✔
1089
                                err := r.Create(ctx, newCM)
7✔
1090
                                if err != nil {
7✔
1091
                                        return false, fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
1092
                                }
×
1093
                        }
1094
                }
1095
        }
1096

1097
        return updated, nil
7✔
1098
}
1099

1100
func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1101

5✔
1102
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
5✔
1103
                "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1104
        })
5✔
1105
        if err != nil {
5✔
1106
                return false, err
×
1107
        }
×
1108

1109
        if pods == nil || len(pods.Items) == 0 {
5✔
1110
                return true, nil
×
1111
        }
×
1112

1113
        // checking for any running or pending pods with no toleration to unschedulable
1114
        // if its has an unschedulable toleration we can ignore
1115
        for _, pod := range pods.Items {
10✔
1116

5✔
1117
                if ShouldEvict(&pod) {
10✔
1118
                        return false, nil
5✔
1119
                }
5✔
1120

1121
        }
1122

1123
        return true, nil
5✔
1124
}
1125

1126
func ShouldEvict(pod *corev1.Pod) bool {
5✔
1127
        switch pod.Status.Phase {
5✔
1128
        case corev1.PodRunning, corev1.PodPending:
5✔
1129

5✔
1130
                for _, taint := range pod.Spec.Tolerations {
10✔
1131
                        switch taint.Key {
5✔
1132
                        case "node.kubernetes.io/unschedulable": // ignoring
5✔
1133
                                return false
5✔
1134
                        }
1135
                }
1136

1137
                if len(pod.ObjectMeta.OwnerReferences) > 1 {
5✔
1138
                        for _, owner := range pod.ObjectMeta.OwnerReferences {
×
1139
                                if owner.Kind == "DaemonSet" { // ignoring
×
1140
                                        return false
×
1141
                                }
×
1142
                        }
1143
                }
1144

1145
                if pod.GetNamespace() == "kube-system" {
5✔
1146
                        return false
×
1147
                }
×
1148

1149
                return true
5✔
1150
        }
1151
        return false
5✔
1152
}
1153

1154
// HandleFinalizer returns true only if we container is deleted and we handled it completely, else false
1155
func (r *SkyhookReconciler) HandleFinalizer(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
7✔
1156
        if skyhook.GetSkyhook().DeletionTimestamp.IsZero() { // if not deleted, and does not have our finalizer, add it
14✔
1157
                if !controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
14✔
1158
                        controllerutil.AddFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
7✔
1159

7✔
1160
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
7✔
1161
                                return false, fmt.Errorf("error updating skyhook to add finalizer: %w", err)
×
1162
                        }
×
1163
                }
1164
        } else { // being delete, time to handle our
7✔
1165
                if controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
14✔
1166

7✔
1167
                        errs := make([]error, 0)
7✔
1168

7✔
1169
                        // zero out all the metrics related to this skyhook both skyhook and packages
7✔
1170
                        zeroOutSkyhookMetrics(skyhook)
7✔
1171

7✔
1172
                        for _, node := range skyhook.GetNodes() {
14✔
1173
                                patch := client.StrategicMergeFrom(node.GetNode().DeepCopy())
7✔
1174

7✔
1175
                                node.Uncordon()
7✔
1176

7✔
1177
                                // if this doesn't change the node then don't patch
7✔
1178
                                if !node.Changed() {
14✔
1179
                                        continue
7✔
1180
                                }
1181

1182
                                err := r.Patch(ctx, node.GetNode(), patch)
4✔
1183
                                if err != nil {
4✔
1184
                                        errs = append(errs, fmt.Errorf("error patching node [%s] in finalizer: %w", node.GetNode().Name, err))
×
1185
                                }
×
1186
                        }
1187

1188
                        if len(errs) > 0 { // we errored, so we need to return error, otherwise we would release the skyhook when we didnt finish
7✔
1189
                                return false, utilerrors.NewAggregate(errs)
×
1190
                        }
×
1191

1192
                        controllerutil.RemoveFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
7✔
1193
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
11✔
1194
                                return false, fmt.Errorf("error updating skyhook removing finalizer: %w", err)
4✔
1195
                        }
4✔
1196
                        // should be 1, and now 2. we want to set ObservedGeneration up to not trigger an logic from this update adding the finalizer
1197
                        skyhook.GetSkyhook().Status.ObservedGeneration = skyhook.GetSkyhook().Status.ObservedGeneration + 1
7✔
1198

7✔
1199
                        if err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
14✔
1200
                                return false, fmt.Errorf("error updating skyhook status: %w", err)
7✔
1201
                        }
7✔
1202

1203
                        return true, nil
×
1204
                }
1205
        }
1206
        return false, nil
7✔
1207
}
1208

1209
// HasNonInterruptWork returns true if pods are running on the node that are either packages, or matches the SCR selector
1210
func (r *SkyhookReconciler) HasNonInterruptWork(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1211

5✔
1212
        selector, err := metav1.LabelSelectorAsSelector(&skyhookNode.GetSkyhook().Spec.PodNonInterruptLabels)
5✔
1213
        if err != nil {
5✔
1214
                return false, fmt.Errorf("error creating selector: %w", err)
×
1215
        }
×
1216

1217
        if selector.Empty() { // when selector is empty it does not do any selecting, ie will return all pods on node.
10✔
1218
                return false, nil
5✔
1219
        }
5✔
1220

1221
        pods, err := r.dal.GetPods(ctx,
5✔
1222
                client.MatchingLabelsSelector{Selector: selector},
5✔
1223
                client.MatchingFields{
5✔
1224
                        "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1225
                },
5✔
1226
        )
5✔
1227
        if err != nil {
5✔
1228
                return false, fmt.Errorf("error getting pods: %w", err)
×
1229
        }
×
1230

1231
        if pods == nil || len(pods.Items) == 0 {
10✔
1232
                return false, nil
5✔
1233
        }
5✔
1234

1235
        for _, pod := range pods.Items {
10✔
1236
                switch pod.Status.Phase {
5✔
1237
                case corev1.PodRunning, corev1.PodPending:
5✔
1238
                        return true, nil
5✔
1239
                }
1240
        }
1241

1242
        return false, nil
×
1243
}
1244

1245
func (r *SkyhookReconciler) HasRunningPackages(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1246
        pods, err := r.dal.GetPods(ctx,
5✔
1247
                client.HasLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX)},
5✔
1248
                client.MatchingFields{
5✔
1249
                        "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1250
                },
5✔
1251
        )
5✔
1252
        if err != nil {
5✔
1253
                return false, fmt.Errorf("error getting pods: %w", err)
×
1254
        }
×
1255

1256
        return pods != nil && len(pods.Items) > 0, nil
5✔
1257
}
1258

1259
func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
5✔
1260
        drained, err := r.IsDrained(ctx, skyhookNode)
5✔
1261
        if err != nil {
5✔
1262
                return false, err
×
1263
        }
×
1264
        if drained {
10✔
1265
                return true, nil
5✔
1266
        }
5✔
1267

1268
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
5✔
1269
                "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1270
        })
5✔
1271
        if err != nil {
5✔
1272
                return false, err
×
1273
        }
×
1274

1275
        if pods == nil || len(pods.Items) == 0 {
5✔
1276
                return true, nil
×
1277
        }
×
1278

1279
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookInterrupt,
5✔
1280
                "draining node [%s] package [%s:%s] from [skyhook:%s]",
5✔
1281
                skyhookNode.GetNode().Name,
5✔
1282
                _package.Name,
5✔
1283
                _package.Version,
5✔
1284
                skyhookNode.GetSkyhook().Name,
5✔
1285
        )
5✔
1286

5✔
1287
        errs := make([]error, 0)
5✔
1288
        for _, pod := range pods.Items {
10✔
1289

5✔
1290
                if ShouldEvict(&pod) {
10✔
1291
                        eviction := policyv1.Eviction{}
5✔
1292
                        err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction)
5✔
1293
                        if err != nil {
5✔
1294
                                errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err))
×
1295
                        }
×
1296
                }
1297
        }
1298

1299
        return len(errs) == 0, utilerrors.NewAggregate(errs)
5✔
1300
}
1301

1302
// Interrupt should not be called unless safe to do so, IE already cordoned and drained
1303
func (r *SkyhookReconciler) Interrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, _interrupt *v1alpha1.Interrupt) error {
5✔
1304

5✔
1305
        hasPackagesRunning, err := r.HasRunningPackages(ctx, skyhookNode)
5✔
1306
        if err != nil {
5✔
1307
                return err
×
1308
        }
×
1309

1310
        if hasPackagesRunning { // keep waiting...
10✔
1311
                return nil
5✔
1312
        }
5✔
1313

1314
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
5✔
1315
        if err != nil {
5✔
1316
                return err
×
1317
        }
×
1318
        if exists {
5✔
1319
                // nothing to do here, already running
×
1320
                return nil
×
1321
        }
×
1322

1323
        // Ensure the node metadata configmap exists before creating the pod
1324
        // This prevents a race where the pod starts before its required configmap is created
1325
        if err := r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhookNode.GetSkyhook(), skyhookNode.GetNode()); err != nil {
5✔
1326
                return fmt.Errorf("error upserting node metadata configmap: %w", err)
×
1327
        }
×
1328

1329
        argEncode, err := _interrupt.ToArgs()
5✔
1330
        if err != nil {
5✔
1331
                return fmt.Errorf("error creating interrupt args: %w", err)
×
1332
        }
×
1333

1334
        pod := createInterruptPodForPackage(r.opts, _interrupt, argEncode, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name)
5✔
1335

5✔
1336
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, v1alpha1.StageInterrupt, _package); err != nil {
5✔
1337
                return fmt.Errorf("error setting package on interrupt: %w", err)
×
1338
        }
×
1339

1340
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
5✔
1341
                return fmt.Errorf("error setting ownership: %w", err)
×
1342
        }
×
1343

1344
        if err := r.Create(ctx, pod); err != nil {
5✔
1345
                return fmt.Errorf("error creating interruption pod: %w", err)
×
1346
        }
×
1347

1348
        _ = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageInterrupt, 0, _package.ContainerSHA)
5✔
1349

5✔
1350
        r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonSkyhookInterrupt,
5✔
1351
                "Interrupting node [%s] package [%s:%s] from [skyhook:%s]",
5✔
1352
                skyhookNode.GetNode().Name,
5✔
1353
                _package.Name,
5✔
1354
                _package.Version,
5✔
1355
                skyhookNode.GetSkyhook().Name)
5✔
1356

5✔
1357
        return nil
5✔
1358
}
1359

1360
// fudgeInterruptWithPriority takes a list of packages, interrupts, and configUpdates and returns the correct merged interrupt to run to handle all the packages
1361
func fudgeInterruptWithPriority(next []*v1alpha1.Package, configUpdates map[string][]string, interrupts map[string][]*v1alpha1.Interrupt) (*v1alpha1.Interrupt, string) {
8✔
1362
        var ret *v1alpha1.Interrupt
8✔
1363
        var pack string
8✔
1364

8✔
1365
        // map interrupt to priority
8✔
1366
        // A lower priority value means a higher priority and will be used in favor of anything with a higher value
8✔
1367
        var priorities = map[v1alpha1.InterruptType]int{
8✔
1368
                v1alpha1.REBOOT:               0,
8✔
1369
                v1alpha1.RESTART_ALL_SERVICES: 1,
8✔
1370
                v1alpha1.SERVICE:              2,
8✔
1371
                v1alpha1.NOOP:                 3,
8✔
1372
        }
8✔
1373

8✔
1374
        for _, _package := range next {
16✔
1375

8✔
1376
                if len(configUpdates[_package.Name]) == 0 {
16✔
1377
                        interrupts[_package.Name] = []*v1alpha1.Interrupt{}
8✔
1378
                        if _package.HasInterrupt() {
14✔
1379
                                interrupts[_package.Name] = append(interrupts[_package.Name], _package.Interrupt)
6✔
1380
                        }
6✔
1381
                }
1382
        }
1383

1384
        packageNames := make([]string, 0)
8✔
1385
        for _, pkg := range next {
16✔
1386
                packageNames = append(packageNames, pkg.Name)
8✔
1387
        }
8✔
1388
        sort.Strings(packageNames)
8✔
1389

8✔
1390
        for _, _package := range packageNames {
16✔
1391
                _interrupts, ok := interrupts[_package]
8✔
1392
                if !ok {
14✔
1393
                        continue
6✔
1394
                }
1395

1396
                for _, interrupt := range _interrupts {
14✔
1397
                        if ret == nil { // prime ret, base case
12✔
1398
                                ret = interrupt
6✔
1399
                                pack = _package
6✔
1400
                        }
6✔
1401

1402
                        // short circuit, reboot has highest priority
1403
                        switch interrupt.Type {
6✔
1404
                        case v1alpha1.REBOOT:
6✔
1405
                                return interrupt, _package
6✔
1406
                        }
1407

1408
                        // check if interrupt is higher priority using the priority_order
1409
                        // A lower priority value means a higher priority
1410
                        if priorities[interrupt.Type] < priorities[ret.Type] {
7✔
1411
                                ret = interrupt
1✔
1412
                                pack = _package
1✔
1413
                        } else if priorities[interrupt.Type] == priorities[ret.Type] {
13✔
1414
                                mergeInterrupt(ret, interrupt)
6✔
1415
                        }
6✔
1416
                }
1417
        }
1418

1419
        return ret, pack // return merged interrupt and package
8✔
1420
}
1421

1422
func mergeInterrupt(left, right *v1alpha1.Interrupt) {
6✔
1423

6✔
1424
        // make sure both are of type service
6✔
1425
        if left.Type != v1alpha1.SERVICE || right.Type != v1alpha1.SERVICE {
7✔
1426
                return
1✔
1427
        }
1✔
1428

1429
        left.Services = merge(left.Services, right.Services)
6✔
1430
}
1431

1432
func merge[T cmp.Ordered](left, right []T) []T {
6✔
1433
        for _, r := range right {
12✔
1434
                if !slices.Contains(left, r) {
12✔
1435
                        left = append(left, r)
6✔
1436
                }
6✔
1437
        }
1438
        slices.Sort(left)
6✔
1439
        return left
6✔
1440
}
1441

1442
// ValidateNodeConfigmaps validates that there are no orphaned or stale config maps for a node
1443
func (r *SkyhookReconciler) ValidateNodeConfigmaps(ctx context.Context, skyhookName string, nodes []wrapper.SkyhookNode) (bool, error) {
7✔
1444
        var list corev1.ConfigMapList
7✔
1445
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhookName})
7✔
1446
        if err != nil {
7✔
1447
                return false, fmt.Errorf("error listing config maps: %w", err)
×
1448
        }
×
1449

1450
        // No configmaps created by this skyhook, no work needs to be done
1451
        if len(list.Items) == 0 {
14✔
1452
                return false, nil
7✔
1453
        }
7✔
1454

1455
        existingCMs := make(map[string]corev1.ConfigMap)
7✔
1456
        for _, cm := range list.Items {
14✔
1457
                existingCMs[cm.Name] = cm
7✔
1458
        }
7✔
1459

1460
        shouldExist := make(map[string]struct{})
7✔
1461
        for _, node := range nodes {
14✔
1462
                shouldExist[generateSafeName(253, skyhookName, node.GetNode().Name, "metadata")] = struct{}{}
7✔
1463
        }
7✔
1464

1465
        update := false
7✔
1466
        errs := make([]error, 0)
7✔
1467
        for k, v := range existingCMs {
14✔
1468
                if _, ok := shouldExist[k]; !ok {
7✔
UNCOV
1469
                        update = true
×
UNCOV
1470
                        err := r.Delete(ctx, &v)
×
UNCOV
1471
                        if err != nil {
×
1472
                                errs = append(errs, fmt.Errorf("error deleting existing config map [%s]: %w", v.Name, err))
×
1473
                        }
×
1474
                }
1475
        }
1476

1477
        // Ensure packages.json is present and up-to-date for expected configmaps
1478
        skyhookCR, err := r.dal.GetSkyhook(ctx, skyhookName)
7✔
1479
        if err != nil {
7✔
1480
                return update, fmt.Errorf("error getting skyhook for metadata validation: %w", err)
×
1481
        }
×
1482
        skyhookWrapper := wrapper.NewSkyhookWrapper(skyhookCR)
7✔
1483
        metadata := NewSkyhookMetadata(r.opts, skyhookWrapper)
7✔
1484
        expectedBytes, err := metadata.Marshal()
7✔
1485
        if err != nil {
7✔
1486
                return update, fmt.Errorf("error marshalling metadata for validation: %w", err)
×
1487
        }
×
1488
        expected := string(expectedBytes)
7✔
1489

7✔
1490
        for i := range list.Items {
14✔
1491
                cm := &list.Items[i]
7✔
1492
                if _, ok := shouldExist[cm.Name]; !ok {
7✔
UNCOV
1493
                        continue
×
1494
                }
1495
                if cm.Data == nil {
7✔
1496
                        cm.Data = make(map[string]string)
×
1497
                }
×
1498
                if cm.Data["packages.json"] != expected {
12✔
1499
                        cm.Data["packages.json"] = expected
5✔
1500
                        if err := r.Update(ctx, cm); err != nil {
6✔
1501
                                errs = append(errs, fmt.Errorf("error updating packages.json on config map [%s]: %w", cm.Name, err))
1✔
1502
                        } else {
6✔
1503
                                update = true
5✔
1504
                        }
5✔
1505
                }
1506
        }
1507

1508
        return update, utilerrors.NewAggregate(errs)
7✔
1509
}
1510

1511
// PodExists tests if this package is exists on a node.
1512
func (r *SkyhookReconciler) PodExists(ctx context.Context, nodeName, skyhookName string, _package *v1alpha1.Package) (bool, error) {
7✔
1513

7✔
1514
        pods, err := r.dal.GetPods(ctx,
7✔
1515
                client.MatchingFields{
7✔
1516
                        "spec.nodeName": nodeName,
7✔
1517
                },
7✔
1518
                client.MatchingLabels{
7✔
1519
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhookName,
7✔
1520
                        fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
7✔
1521
                },
7✔
1522
        )
7✔
1523
        if err != nil {
7✔
1524
                return false, fmt.Errorf("error check from existing pods: %w", err)
×
1525
        }
×
1526

1527
        if pods == nil || len(pods.Items) == 0 {
14✔
1528
                return false, nil
7✔
1529
        }
7✔
1530
        return true, nil
7✔
1531
}
1532

1533
// createInterruptPodForPackage returns the pod spec for an interrupt pod given an package
1534
func createInterruptPodForPackage(opts SkyhookOperatorOptions, _interrupt *v1alpha1.Interrupt, argEncode string, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string) *corev1.Pod {
6✔
1535
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
6✔
1536
                opts.CopyDirRoot,
6✔
1537
                skyhook.Name,
6✔
1538
                _package.Name,
6✔
1539
                _package.Version,
6✔
1540
                skyhook.UID,
6✔
1541
                skyhook.Generation,
6✔
1542
        )
6✔
1543

6✔
1544
        volumes := []corev1.Volume{
6✔
1545
                {
6✔
1546
                        Name: "root-mount",
6✔
1547
                        VolumeSource: corev1.VolumeSource{
6✔
1548
                                HostPath: &corev1.HostPathVolumeSource{
6✔
1549
                                        Path: "/",
6✔
1550
                                },
6✔
1551
                        },
6✔
1552
                },
6✔
1553
                {
6✔
1554
                        // node names in different CSPs might include dots which isn't allowed in volume names
6✔
1555
                        // so we have to replace all dots with dashes
6✔
1556
                        Name: generateSafeName(63, skyhook.Name, nodeName, "metadata"),
6✔
1557
                        VolumeSource: corev1.VolumeSource{
6✔
1558
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
1559
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
1560
                                                Name: strings.ReplaceAll(fmt.Sprintf("%s-%s-metadata", skyhook.Name, nodeName), ".", "-"),
6✔
1561
                                        },
6✔
1562
                                },
6✔
1563
                        },
6✔
1564
                },
6✔
1565
        }
6✔
1566
        volumeMounts := []corev1.VolumeMount{
6✔
1567
                {
6✔
1568
                        Name:             "root-mount",
6✔
1569
                        MountPath:        "/root",
6✔
1570
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
6✔
1571
                },
6✔
1572
        }
6✔
1573

6✔
1574
        pod := &corev1.Pod{
6✔
1575
                ObjectMeta: metav1.ObjectMeta{
6✔
1576
                        Name:      generateSafeName(63, skyhook.Name, "interrupt", string(_interrupt.Type), nodeName),
6✔
1577
                        Namespace: opts.Namespace,
6✔
1578
                        Labels: map[string]string{
6✔
1579
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
6✔
1580
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX):   fmt.Sprintf("%s-%s", _package.Name, _package.Version),
6✔
1581
                                fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX): "True",
6✔
1582
                        },
6✔
1583
                },
6✔
1584
                Spec: corev1.PodSpec{
6✔
1585
                        NodeName:      nodeName,
6✔
1586
                        RestartPolicy: corev1.RestartPolicyOnFailure,
6✔
1587
                        InitContainers: []corev1.Container{
6✔
1588
                                {
6✔
1589
                                        Name:  InterruptContainerName,
6✔
1590
                                        Image: getAgentImage(opts, _package),
6✔
1591
                                        Args:  []string{"interrupt", "/root", copyDir, argEncode},
6✔
1592
                                        Env:   getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name),
6✔
1593
                                        SecurityContext: &corev1.SecurityContext{
6✔
1594
                                                Privileged: ptr(true),
6✔
1595
                                        },
6✔
1596
                                        VolumeMounts: volumeMounts,
6✔
1597
                                        Resources: corev1.ResourceRequirements{
6✔
1598
                                                Limits: corev1.ResourceList{
6✔
1599
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
6✔
1600
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
6✔
1601
                                                },
6✔
1602
                                                Requests: corev1.ResourceList{
6✔
1603
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
6✔
1604
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
6✔
1605
                                                },
6✔
1606
                                        },
6✔
1607
                                },
6✔
1608
                        },
6✔
1609
                        Containers: []corev1.Container{
6✔
1610
                                {
6✔
1611
                                        Name:  "pause",
6✔
1612
                                        Image: opts.PauseImage,
6✔
1613
                                        Resources: corev1.ResourceRequirements{
6✔
1614
                                                Limits: corev1.ResourceList{
6✔
1615
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
6✔
1616
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
6✔
1617
                                                },
6✔
1618
                                                Requests: corev1.ResourceList{
6✔
1619
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
6✔
1620
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
6✔
1621
                                                },
6✔
1622
                                        },
6✔
1623
                                },
6✔
1624
                        },
6✔
1625
                        HostPID:     true,
6✔
1626
                        HostNetwork: true,
6✔
1627
                        // If you change these go change the SelectNode toleration in cluster_state.go
6✔
1628
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
6✔
1629
                                {
6✔
1630
                                        Key:      TaintUnschedulable,
6✔
1631
                                        Operator: corev1.TolerationOpExists,
6✔
1632
                                },
6✔
1633
                                opts.GetRuntimeRequiredToleration(),
6✔
1634
                        }, skyhook.Spec.AdditionalTolerations...),
6✔
1635
                        Volumes: volumes,
6✔
1636
                },
6✔
1637
        }
6✔
1638
        if opts.ImagePullSecret != "" {
6✔
1639
                pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
×
1640
                        {
×
1641
                                Name: opts.ImagePullSecret,
×
1642
                        },
×
1643
                }
×
1644
        }
×
1645
        return pod
6✔
1646
}
1647

1648
func trunstr(str string, length int) string {
8✔
1649
        if len(str) > length {
8✔
1650
                return str[:length]
×
1651
        }
×
1652
        return str
8✔
1653
}
1654

1655
func getAgentImage(opts SkyhookOperatorOptions, _package *v1alpha1.Package) string {
8✔
1656
        if _package.AgentImageOverride != "" {
13✔
1657
                return _package.AgentImageOverride
5✔
1658
        }
5✔
1659
        return opts.AgentImage
8✔
1660
}
1661

1662
// getPackageImage returns the full image reference for a package, using the digest if specified
1663
func getPackageImage(_package *v1alpha1.Package) string {
8✔
1664
        if _package.ContainerSHA != "" {
13✔
1665
                // When containerSHA is specified, use it instead of the version tag for immutable image reference
5✔
1666
                return fmt.Sprintf("%s@%s", _package.Image, _package.ContainerSHA)
5✔
1667
        }
5✔
1668
        // Fall back to version tag
1669
        return fmt.Sprintf("%s:%s", _package.Image, _package.Version)
8✔
1670
}
1671

1672
func getAgentConfigEnvVars(opts SkyhookOperatorOptions, packageName string, packageVersion string, resourceID string, skyhookName string) []corev1.EnvVar {
8✔
1673
        return []corev1.EnvVar{
8✔
1674
                {
8✔
1675
                        Name:  "SKYHOOK_LOG_DIR",
8✔
1676
                        Value: fmt.Sprintf("%s/%s", opts.AgentLogRoot, skyhookName),
8✔
1677
                },
8✔
1678
                {
8✔
1679
                        Name:  "SKYHOOK_ROOT_DIR",
8✔
1680
                        Value: fmt.Sprintf("%s/%s", opts.CopyDirRoot, skyhookName),
8✔
1681
                },
8✔
1682
                {
8✔
1683
                        Name:  "COPY_RESOLV",
8✔
1684
                        Value: "false",
8✔
1685
                },
8✔
1686
                {
8✔
1687
                        Name:  "SKYHOOK_RESOURCE_ID",
8✔
1688
                        Value: fmt.Sprintf("%s_%s_%s", resourceID, packageName, packageVersion),
8✔
1689
                },
8✔
1690
        }
8✔
1691
}
8✔
1692

1693
// createPodFromPackage creates a pod spec for a skyhook pod for a given package
1694
func createPodFromPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string, stage v1alpha1.Stage) *corev1.Pod {
8✔
1695
        // Generate consistent names that won't exceed k8s limits
8✔
1696
        volumeName := generateSafeName(63, "metadata", nodeName)
8✔
1697
        configMapName := generateSafeName(253, skyhook.Name, nodeName, "metadata")
8✔
1698

8✔
1699
        volumes := []corev1.Volume{
8✔
1700
                {
8✔
1701
                        Name: "root-mount",
8✔
1702
                        VolumeSource: corev1.VolumeSource{
8✔
1703
                                HostPath: &corev1.HostPathVolumeSource{
8✔
1704
                                        Path: "/",
8✔
1705
                                },
8✔
1706
                        },
8✔
1707
                },
8✔
1708
                {
8✔
1709
                        Name: volumeName,
8✔
1710
                        VolumeSource: corev1.VolumeSource{
8✔
1711
                                ConfigMap: &corev1.ConfigMapVolumeSource{
8✔
1712
                                        LocalObjectReference: corev1.LocalObjectReference{
8✔
1713
                                                Name: configMapName,
8✔
1714
                                        },
8✔
1715
                                },
8✔
1716
                        },
8✔
1717
                },
8✔
1718
        }
8✔
1719

8✔
1720
        volumeMounts := []corev1.VolumeMount{
8✔
1721
                {
8✔
1722
                        Name:             "root-mount",
8✔
1723
                        MountPath:        "/root",
8✔
1724
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
8✔
1725
                },
8✔
1726
                {
8✔
1727
                        Name:      volumeName,
8✔
1728
                        MountPath: "/skyhook-package/node-metadata",
8✔
1729
                },
8✔
1730
        }
8✔
1731

8✔
1732
        if len(_package.ConfigMap) > 0 {
15✔
1733
                volumeMounts = append(volumeMounts, corev1.VolumeMount{
7✔
1734
                        Name:      _package.Name,
7✔
1735
                        MountPath: "/skyhook-package/configmaps",
7✔
1736
                })
7✔
1737

7✔
1738
                volumes = append(volumes, corev1.Volume{
7✔
1739
                        Name: _package.Name,
7✔
1740
                        VolumeSource: corev1.VolumeSource{
7✔
1741
                                ConfigMap: &corev1.ConfigMapVolumeSource{
7✔
1742
                                        LocalObjectReference: corev1.LocalObjectReference{
7✔
1743
                                                Name: strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.Name, _package.Name, _package.Version)),
7✔
1744
                                        },
7✔
1745
                                },
7✔
1746
                        },
7✔
1747
                })
7✔
1748
        }
7✔
1749

1750
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
8✔
1751
                opts.CopyDirRoot,
8✔
1752
                skyhook.Name,
8✔
1753
                _package.Name,
8✔
1754
                _package.Version,
8✔
1755
                skyhook.UID,
8✔
1756
                skyhook.Generation,
8✔
1757
        )
8✔
1758
        applyargs := []string{strings.ToLower(string(stage)), "/root", copyDir}
8✔
1759
        checkargs := []string{strings.ToLower(string(stage) + "-check"), "/root", copyDir}
8✔
1760

8✔
1761
        agentEnvs := append(
8✔
1762
                _package.Env,
8✔
1763
                getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name)...,
8✔
1764
        )
8✔
1765

8✔
1766
        pod := &corev1.Pod{
8✔
1767
                ObjectMeta: metav1.ObjectMeta{
8✔
1768
                        Name:      generateSafeName(63, skyhook.Name, _package.Name, _package.Version, string(stage), nodeName),
8✔
1769
                        Namespace: opts.Namespace,
8✔
1770
                        Labels: map[string]string{
8✔
1771
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.Name,
8✔
1772
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
8✔
1773
                        },
8✔
1774
                },
8✔
1775
                Spec: corev1.PodSpec{
8✔
1776
                        NodeName:      nodeName,
8✔
1777
                        RestartPolicy: corev1.RestartPolicyOnFailure,
8✔
1778
                        InitContainers: []corev1.Container{
8✔
1779
                                {
8✔
1780
                                        Name:            fmt.Sprintf("%s-init", trunstr(_package.Name, 43)),
8✔
1781
                                        Image:           getPackageImage(_package),
8✔
1782
                                        ImagePullPolicy: "Always",
8✔
1783
                                        Command:         []string{"/bin/sh"},
8✔
1784
                                        Args: []string{
8✔
1785
                                                "-c",
8✔
1786
                                                "mkdir -p /root/${SKYHOOK_DIR} && cp -r /skyhook-package/* /root/${SKYHOOK_DIR}",
8✔
1787
                                        },
8✔
1788
                                        Env: []corev1.EnvVar{
8✔
1789
                                                {
8✔
1790
                                                        Name:  "SKYHOOK_DIR",
8✔
1791
                                                        Value: copyDir,
8✔
1792
                                                },
8✔
1793
                                        },
8✔
1794
                                        SecurityContext: &corev1.SecurityContext{
8✔
1795
                                                Privileged: ptr(true),
8✔
1796
                                        },
8✔
1797
                                        VolumeMounts: volumeMounts,
8✔
1798
                                },
8✔
1799
                                {
8✔
1800
                                        Name:            fmt.Sprintf("%s-%s", trunstr(_package.Name, 43), stage),
8✔
1801
                                        Image:           getAgentImage(opts, _package),
8✔
1802
                                        ImagePullPolicy: "Always",
8✔
1803
                                        Args:            applyargs,
8✔
1804
                                        Env:             agentEnvs,
8✔
1805
                                        SecurityContext: &corev1.SecurityContext{
8✔
1806
                                                Privileged: ptr(true),
8✔
1807
                                        },
8✔
1808
                                        VolumeMounts: volumeMounts,
8✔
1809
                                },
8✔
1810
                                {
8✔
1811
                                        Name:            fmt.Sprintf("%s-%scheck", trunstr(_package.Name, 43), stage),
8✔
1812
                                        Image:           getAgentImage(opts, _package),
8✔
1813
                                        ImagePullPolicy: "Always",
8✔
1814
                                        Args:            checkargs,
8✔
1815
                                        Env:             agentEnvs,
8✔
1816
                                        SecurityContext: &corev1.SecurityContext{
8✔
1817
                                                Privileged: ptr(true),
8✔
1818
                                        },
8✔
1819
                                        VolumeMounts: volumeMounts,
8✔
1820
                                },
8✔
1821
                        },
8✔
1822
                        Containers: []corev1.Container{
8✔
1823
                                {
8✔
1824
                                        Name:  "pause",
8✔
1825
                                        Image: opts.PauseImage,
8✔
1826
                                        Resources: corev1.ResourceRequirements{
8✔
1827
                                                Limits: corev1.ResourceList{
8✔
1828
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
8✔
1829
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
8✔
1830
                                                },
8✔
1831
                                                Requests: corev1.ResourceList{
8✔
1832
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
8✔
1833
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
8✔
1834
                                                },
8✔
1835
                                        },
8✔
1836
                                },
8✔
1837
                        },
8✔
1838
                        Volumes:     volumes,
8✔
1839
                        HostPID:     true,
8✔
1840
                        HostNetwork: true,
8✔
1841
                        // If you change these go change the SelectNode toleration in cluster_state.go
8✔
1842
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
8✔
1843
                                {
8✔
1844
                                        Key:      TaintUnschedulable,
8✔
1845
                                        Operator: corev1.TolerationOpExists,
8✔
1846
                                },
8✔
1847
                                opts.GetRuntimeRequiredToleration(),
8✔
1848
                        }, skyhook.Spec.AdditionalTolerations...),
8✔
1849
                },
8✔
1850
        }
8✔
1851
        if opts.ImagePullSecret != "" {
8✔
1852
                pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
×
1853
                        {
×
1854
                                Name: opts.ImagePullSecret,
×
1855
                        },
×
1856
                }
×
1857
        }
×
1858
        if _package.GracefulShutdown != nil {
13✔
1859
                pod.Spec.TerminationGracePeriodSeconds = ptr(int64(_package.GracefulShutdown.Duration.Seconds()))
5✔
1860
        }
5✔
1861
        setPodResources(pod, _package.Resources)
8✔
1862
        return pod
8✔
1863
}
1864

1865
// FilterEnv removes the environment variables passed into exlude
1866
func FilterEnv(envs []corev1.EnvVar, exclude ...string) []corev1.EnvVar {
8✔
1867
        var filteredEnv []corev1.EnvVar
8✔
1868

8✔
1869
        // build map of exclude strings for faster lookup
8✔
1870
        excludeMap := make(map[string]struct{})
8✔
1871
        for _, name := range exclude {
16✔
1872
                excludeMap[name] = struct{}{}
8✔
1873
        }
8✔
1874

1875
        // If the environment variable name is in the exclude list, skip it
1876
        // otherwise append it to the final list
1877
        for _, env := range envs {
16✔
1878
                if _, found := excludeMap[env.Name]; !found {
16✔
1879
                        filteredEnv = append(filteredEnv, env)
8✔
1880
                }
8✔
1881
        }
1882

1883
        return filteredEnv
8✔
1884
}
1885

1886
// PodMatchesPackage asserts that a given pod matches the given pod spec
1887
func podMatchesPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, pod corev1.Pod, skyhook *wrapper.Skyhook, stage v1alpha1.Stage) bool {
8✔
1888
        var expectedPod *corev1.Pod
8✔
1889

8✔
1890
        // need to differentiate whether the pod is for an interrupt or not so we know
8✔
1891
        // what to expect and how to compare them
8✔
1892
        isInterrupt := false
8✔
1893
        _, limitRange := pod.Annotations["kubernetes.io/limit-ranger"]
8✔
1894

8✔
1895
        if pod.Labels[fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX)] == "True" {
14✔
1896
                expectedPod = createInterruptPodForPackage(opts, &v1alpha1.Interrupt{}, "", _package, skyhook, "")
6✔
1897
                isInterrupt = true
6✔
1898
        } else {
14✔
1899
                expectedPod = createPodFromPackage(opts, _package, skyhook, "", stage)
8✔
1900
        }
8✔
1901

1902
        actualPod := pod.DeepCopy()
8✔
1903

8✔
1904
        // check to see whether the name or the version of the package changed
8✔
1905
        packageLabel := fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX)
8✔
1906
        if actualPod.Labels[packageLabel] != expectedPod.Labels[packageLabel] {
14✔
1907
                return false
6✔
1908
        }
6✔
1909

1910
        // compare initContainers since this is where a lot of the important info lives
1911
        for i := range actualPod.Spec.InitContainers {
16✔
1912
                expectedContainer := expectedPod.Spec.InitContainers[i]
8✔
1913
                actualContainer := actualPod.Spec.InitContainers[i]
8✔
1914

8✔
1915
                if expectedContainer.Name != actualContainer.Name {
9✔
1916
                        return false
1✔
1917
                }
1✔
1918

1919
                if expectedContainer.Image != actualContainer.Image {
8✔
1920
                        return false
×
1921
                }
×
1922

1923
                // compare the containers env vars except for the ones that are inserted
1924
                // by the operator by default as the SKYHOOK_RESOURCE_ID will change every
1925
                // time the skyhook is updated and would cause every pod to be removed
1926
                // TODO: This is ignoring all the static env vars that are set by operator config.
1927
                // It probably should be just SKYHOOK_RESOURCE_ID that is ignored. Otherwise,
1928
                // a user will have to manually delete the pod to update the package when operator is updated.
1929
                dummyAgentEnv := getAgentConfigEnvVars(opts, "", "", "", "")
8✔
1930
                excludedEnvs := make([]string, len(dummyAgentEnv))
8✔
1931
                for i, env := range dummyAgentEnv {
16✔
1932
                        excludedEnvs[i] = env.Name
8✔
1933
                }
8✔
1934
                expectedFilteredEnv := FilterEnv(expectedContainer.Env, excludedEnvs...)
8✔
1935
                actualFilteredEnv := FilterEnv(actualContainer.Env, excludedEnvs...)
8✔
1936
                if !reflect.DeepEqual(expectedFilteredEnv, actualFilteredEnv) {
14✔
1937
                        return false
6✔
1938
                }
6✔
1939

1940
                if !isInterrupt { // dont compare these since they are not configured on interrupt
16✔
1941
                        // compare resource requests and limits (CPU, memory, etc.)
8✔
1942
                        expectedResources := expectedContainer.Resources
8✔
1943
                        actualResources := actualContainer.Resources
8✔
1944
                        if skyhook.Spec.Packages[_package.Name].Resources != nil {
14✔
1945
                                // If CR has resources specified, they should match exactly
6✔
1946
                                if !reflect.DeepEqual(expectedResources, actualResources) {
7✔
1947
                                        return false
1✔
1948
                                }
1✔
1949
                        } else {
8✔
1950
                                // If CR has no resources specified, ensure pod has no resource overrides
8✔
1951
                                if !limitRange {
16✔
1952
                                        if actualResources.Requests != nil || actualResources.Limits != nil {
9✔
1953
                                                return false
1✔
1954
                                        }
1✔
1955
                                }
1956
                        }
1957
                }
1958
        }
1959

1960
        return true
8✔
1961
}
1962

1963
// ValidateRunningPackages deletes pods that don't match the current spec and checks if there are pods running
1964
// that don't match the node state and removes them if they exist
1965
func (r *SkyhookReconciler) ValidateRunningPackages(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
7✔
1966

7✔
1967
        update := false
7✔
1968
        errs := make([]error, 0)
7✔
1969
        // get all pods for this skyhook packages
7✔
1970
        pods, err := r.dal.GetPods(ctx,
7✔
1971
                client.MatchingLabels{
7✔
1972
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
7✔
1973
                },
7✔
1974
        )
7✔
1975
        if err != nil {
7✔
1976
                return false, fmt.Errorf("error getting pods while validating packages: %w", err)
×
1977
        }
×
1978
        if pods == nil || len(pods.Items) == 0 {
14✔
1979
                return false, nil // nothing running for this skyhook on this node
7✔
1980
        }
7✔
1981

1982
        // Initialize metrics for each stage
1983
        stages := make(map[string]map[string]map[v1alpha1.Stage]int)
7✔
1984

7✔
1985
        // group pods by node
7✔
1986
        podsbyNode := make(map[string][]corev1.Pod)
7✔
1987
        for _, pod := range pods.Items {
14✔
1988
                podsbyNode[pod.Spec.NodeName] = append(podsbyNode[pod.Spec.NodeName], pod)
7✔
1989
        }
7✔
1990

1991
        for _, node := range skyhook.GetNodes() {
14✔
1992
                nodeState, err := node.State()
7✔
1993
                if err != nil {
7✔
1994
                        return false, fmt.Errorf("error getting node state: %w", err)
×
1995
                }
×
1996

1997
                for _, pod := range podsbyNode[node.GetNode().Name] {
14✔
1998
                        found := false
7✔
1999

7✔
2000
                        runningPackage, err := GetPackage(&pod)
7✔
2001
                        if err != nil {
7✔
2002
                                errs = append(errs, fmt.Errorf("error getting package from pod [%s:%s] while validating packages: %w", pod.Namespace, pod.Name, err))
×
2003
                        }
×
2004

2005
                        // check if the package is part of the skyhook spec, if not we need to delete it
2006
                        for _, v := range skyhook.GetSkyhook().Spec.Packages {
14✔
2007
                                if podMatchesPackage(r.opts, &v, pod, skyhook.GetSkyhook(), runningPackage.Stage) {
14✔
2008
                                        found = true
7✔
2009
                                }
7✔
2010
                        }
2011

2012
                        // Increment the stage count for metrics
2013
                        if _, ok := stages[runningPackage.Name]; !ok {
14✔
2014
                                stages[runningPackage.Name] = make(map[string]map[v1alpha1.Stage]int)
7✔
2015
                                if _, ok := stages[runningPackage.Name][runningPackage.Version]; !ok {
14✔
2016
                                        stages[runningPackage.Name][runningPackage.Version] = make(map[v1alpha1.Stage]int)
7✔
2017
                                        for _, stage := range v1alpha1.Stages {
14✔
2018
                                                stages[runningPackage.Name][runningPackage.Version][stage] = 0
7✔
2019
                                        }
7✔
2020
                                }
2021
                        }
2022
                        stages[runningPackage.Name][runningPackage.Version][runningPackage.Stage]++
7✔
2023

7✔
2024
                        // uninstall is by definition not part of the skyhook spec, so we cant delete it (because it used to be but was removed, hence uninstalling it)
7✔
2025
                        if runningPackage.Stage == v1alpha1.StageUninstall {
12✔
2026
                                found = true
5✔
2027
                        }
5✔
2028

2029
                        if !found {
12✔
2030
                                update = true
5✔
2031

5✔
2032
                                err := r.InvalidPackage(ctx, &pod)
5✔
2033
                                if err != nil {
8✔
2034
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
3✔
2035
                                }
3✔
2036
                                continue
5✔
2037
                        }
2038

2039
                        // Check if package exists in node state, ie a package running that the node state doesn't know about
2040
                        // something that is often done to try to fix bad node state is to clear the node state completely
2041
                        // which if a package is running, we want to terminate it gracefully. Ofthen what leads to this is
2042
                        // the package is in a crashloop and the operator want to restart it the whole package.
2043
                        // when we apply a package it just check if there is a running package on the node for the state of the package
2044
                        // this can cause to leave a pod running in say config mode, and it there is a depends on you might not correctly
2045
                        // run thins in the correct order.
2046
                        deleteMe := false
7✔
2047
                        packageStatus, exists := nodeState[runningPackage.GetUniqueName()]
7✔
2048
                        if !exists { // package not in node state, so we need to delete it
13✔
2049
                                deleteMe = true
6✔
2050
                        } else { // package in node state, so we need to check if it's running
13✔
2051
                                // need check if the stats match, if not we need to delete it
7✔
2052
                                if packageStatus.Stage != runningPackage.Stage {
13✔
2053
                                        deleteMe = true
6✔
2054
                                }
6✔
2055
                        }
2056

2057
                        if deleteMe {
13✔
2058
                                update = true
6✔
2059
                                err := r.InvalidPackage(ctx, &pod)
6✔
2060
                                if err != nil {
10✔
2061
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
4✔
2062
                                }
4✔
2063
                        }
2064
                }
2065
        }
2066

2067
        return update, utilerrors.NewAggregate(errs)
7✔
2068
}
2069

2070
// InvalidPackage invalidates a package and updates the pod, which will trigger the pod to be deleted
2071
func (r *SkyhookReconciler) InvalidPackage(ctx context.Context, pod *corev1.Pod) error {
6✔
2072
        err := InvalidatePackage(pod)
6✔
2073
        if err != nil {
6✔
2074
                return fmt.Errorf("error invalidating package: %w", err)
×
2075
        }
×
2076

2077
        err = r.Update(ctx, pod)
6✔
2078
        if err != nil {
11✔
2079
                return fmt.Errorf("error updating pod: %w", err)
5✔
2080
        }
5✔
2081

2082
        return nil
6✔
2083
}
2084

2085
// ProcessInterrupt will check and do the interrupt if need, and returns
2086
// false means we are waiting
2087
// true means we are good to proceed
2088
func (r *SkyhookReconciler) ProcessInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, interrupt *v1alpha1.Interrupt, runInterrupt bool) (bool, error) {
7✔
2089

7✔
2090
        if !skyhookNode.HasInterrupt(*_package) {
14✔
2091
                return true, nil
7✔
2092
        }
7✔
2093

2094
        // default starting stage
2095
        stage := v1alpha1.StageApply
5✔
2096
        nextStage := skyhookNode.NextStage(_package)
5✔
2097
        if nextStage != nil {
10✔
2098
                stage = *nextStage
5✔
2099
        }
5✔
2100

2101
        // wait tell this is done if its happening
2102
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
5✔
2103
        if found && status.State == v1alpha1.StateSkipped {
10✔
2104
                return false, nil
5✔
2105
        }
5✔
2106

2107
        // Theres is a race condition when a node reboots and api cleans up the interrupt pod
2108
        // so we need to check if the pod exists and if it does, we need to recreate it
2109
        if status != nil && (status.State == v1alpha1.StateInProgress || status.State == v1alpha1.StateErroring) && status.Stage == v1alpha1.StageInterrupt {
10✔
2110
                // call interrupt to recreate the pod if missing
5✔
2111
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
5✔
2112
                if err != nil {
5✔
2113
                        return false, err
×
2114
                }
×
2115
        }
2116

2117
        // drain and cordon node before applying package that has an interrupt
2118
        if stage == v1alpha1.StageApply {
10✔
2119
                ready, err := r.EnsureNodeIsReadyForInterrupt(ctx, skyhookNode, _package)
5✔
2120
                if err != nil {
5✔
2121
                        return false, err
×
2122
                }
×
2123

2124
                if !ready {
10✔
2125
                        return false, nil
5✔
2126
                }
5✔
2127
        }
2128

2129
        // time to interrupt (once other packages have finished)
2130
        if stage == v1alpha1.StageInterrupt && runInterrupt {
10✔
2131
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
5✔
2132
                if err != nil {
5✔
2133
                        return false, err
×
2134
                }
×
2135

2136
                return false, nil
5✔
2137
        }
2138

2139
        //skipping
2140
        if stage == v1alpha1.StageInterrupt && !runInterrupt {
10✔
2141
                err := skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, stage, 0, _package.ContainerSHA)
5✔
2142
                if err != nil {
5✔
2143
                        return false, fmt.Errorf("error upserting to skip interrupt: %w", err)
×
2144
                }
×
2145
                return false, nil
5✔
2146
        }
2147

2148
        // wait tell this is done if its happening
2149
        if status != nil && status.Stage == v1alpha1.StageInterrupt && status.State != v1alpha1.StateComplete {
10✔
2150
                return false, nil
5✔
2151
        }
5✔
2152

2153
        return true, nil
5✔
2154
}
2155

2156
func (r *SkyhookReconciler) EnsureNodeIsReadyForInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
5✔
2157
        // cordon node
5✔
2158
        skyhookNode.Cordon()
5✔
2159

5✔
2160
        hasWork, err := r.HasNonInterruptWork(ctx, skyhookNode)
5✔
2161
        if err != nil {
5✔
2162
                return false, err
×
2163
        }
×
2164
        if hasWork { // keep waiting...
10✔
2165
                return false, nil
5✔
2166
        }
5✔
2167

2168
        ready, err := r.DrainNode(ctx, skyhookNode, _package)
5✔
2169
        if err != nil {
5✔
2170
                return false, fmt.Errorf("error draining node [%s]: %w", skyhookNode.GetNode().Name, err)
×
2171
        }
×
2172

2173
        return ready, nil
5✔
2174
}
2175

2176
// ApplyPackage starts a pod on node for the package
2177
func (r *SkyhookReconciler) ApplyPackage(ctx context.Context, logger logr.Logger, clusterState *clusterState, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, runInterrupt bool) error {
7✔
2178

7✔
2179
        if _package == nil {
7✔
2180
                return errors.New("can not apply nil package")
×
2181
        }
×
2182

2183
        // default starting stage
2184
        stage := v1alpha1.StageApply
7✔
2185

7✔
2186
        // These modes don't have anything that comes before them so we must specify them as the
7✔
2187
        // starting point. The next stage function will return nil until these modes complete.
7✔
2188
        // Config is a special case as sometimes apply will come before it and other times it wont
7✔
2189
        // which is why it needs to be here as well
7✔
2190
        if packageStatus, found := skyhookNode.PackageStatus(_package.GetUniqueName()); found {
14✔
2191
                switch packageStatus.Stage {
7✔
2192
                case v1alpha1.StageConfig, v1alpha1.StageUpgrade, v1alpha1.StageUninstall:
7✔
2193
                        stage = packageStatus.Stage
7✔
2194
                }
2195
        }
2196

2197
        // if stage != v1alpha1.StageApply {
2198
        //         // If a node gets rest by a user, the about method will return the wrong node state. Above sources it from the skyhook status.
2199
        //         // check if the node has nothing, reset it then apply the package.
2200
        //         nodeState, err := skyhookNode.State()
2201
        //         if err != nil {
2202
        //                 return fmt.Errorf("error getting node state: %w", err)
2203
        //         }
2204

2205
        //         _, found := nodeState[_package.GetUniqueName()]
2206
        //         if !found {
2207
        //                 stage = v1alpha1.StageApply
2208
        //         }
2209
        // }
2210

2211
        nextStage := skyhookNode.NextStage(_package)
7✔
2212
        if nextStage != nil {
14✔
2213
                stage = *nextStage
7✔
2214
        }
7✔
2215

2216
        // test if pod exists, if so, bailout
2217
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
7✔
2218
        if err != nil {
7✔
2219
                return err
×
2220
        }
×
2221

2222
        // wait tell this is done if its happening
2223
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
7✔
2224

7✔
2225
        if found && status.State == v1alpha1.StateSkipped { // skipped, so nothing to do
7✔
2226
                return nil
×
2227
        }
×
2228

2229
        if found && status.State == v1alpha1.StateInProgress { // running, so do nothing atm
14✔
2230
                if exists {
14✔
2231
                        return nil
7✔
2232
                }
7✔
2233
        }
2234

2235
        if exists {
14✔
2236
                // nothing to do here, already running
7✔
2237
                return nil
7✔
2238
        }
7✔
2239

2240
        // Ensure the node metadata configmap exists before creating the pod
2241
        // This prevents a race where the pod starts before its required configmap is created
2242
        if err := r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhookNode.GetSkyhook(), skyhookNode.GetNode()); err != nil {
7✔
2243
                return fmt.Errorf("error upserting node metadata configmap: %w", err)
×
2244
        }
×
2245

2246
        pod := createPodFromPackage(r.opts, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name, stage)
7✔
2247

7✔
2248
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, stage, _package); err != nil {
7✔
2249
                return fmt.Errorf("error setting package on pod: %w", err)
×
2250
        }
×
2251

2252
        // setup ownership of the pod we created
2253
        // helps run time know what to do when something happens to this pod we are about to create
2254
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
7✔
2255
                return fmt.Errorf("error setting ownership: %w", err)
×
2256
        }
×
2257

2258
        if err := r.Create(ctx, pod); err != nil {
7✔
2259
                return fmt.Errorf("error creating pod: %w", err)
×
2260
        }
×
2261

2262
        if err = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, stage, 0, _package.ContainerSHA); err != nil {
7✔
2263
                err = fmt.Errorf("error upserting package: %w", err) // want to keep going in this case, but don't want to lose the err
×
2264
        }
×
2265

2266
        skyhookNode.SetStatus(v1alpha1.StatusInProgress)
7✔
2267

7✔
2268
        skyhookNode.GetSkyhook().AddCondition(metav1.Condition{
7✔
2269
                Type:               fmt.Sprintf("%s/ApplyPackage", v1alpha1.METADATA_PREFIX),
7✔
2270
                Status:             metav1.ConditionTrue,
7✔
2271
                ObservedGeneration: skyhookNode.GetSkyhook().Generation,
7✔
2272
                LastTransitionTime: metav1.Now(),
7✔
2273
                Reason:             "ApplyPackage",
7✔
2274
                Message:            fmt.Sprintf("Applying package [%s:%s] to node [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name),
7✔
2275
        })
7✔
2276

7✔
2277
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] from [skyhook:%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetSkyhook().Name, stage)
7✔
2278
        r.recorder.Eventf(skyhookNode.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] to node [%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name, stage)
7✔
2279

7✔
2280
        skyhookNode.GetSkyhook().Updated = true
7✔
2281

7✔
2282
        return err
7✔
2283
}
2284

2285
// HandleRuntimeRequired finds any nodes for which all runtime required Skyhooks are complete and remove their runtime required taint
2286
// Will return an error if the patching of the nodes is not possible
2287
func (r *SkyhookReconciler) HandleRuntimeRequired(ctx context.Context, clusterState *clusterState) error {
7✔
2288
        node_to_skyhooks, skyhook_node_map := groupSkyhooksByNode(clusterState)
7✔
2289
        to_remove := getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks, skyhook_node_map)
7✔
2290
        // Remove the runtime required taint from nodes in to_remove
7✔
2291
        taint_to_remove := r.opts.GetRuntimeRequiredTaint()
7✔
2292
        errs := make([]error, 0)
7✔
2293
        for _, node := range to_remove {
12✔
2294
                // check before removing taint that it even exists to begin with
5✔
2295
                if !taints.TaintExists(node.Spec.Taints, &taint_to_remove) {
10✔
2296
                        continue
5✔
2297
                }
2298
                // RemoveTaint will ALWAYS return nil for its error so no need to check it
2299
                new_node, updated, _ := taints.RemoveTaint(node, &taint_to_remove)
5✔
2300
                if updated {
10✔
2301
                        err := r.Patch(ctx, new_node, client.MergeFrom(node))
5✔
2302
                        if err != nil {
5✔
2303
                                errs = append(errs, err)
×
2304
                        }
×
2305
                }
2306
        }
2307
        if len(errs) > 0 {
7✔
2308
                return utilerrors.NewAggregate(errs)
×
2309
        }
×
2310
        return nil
7✔
2311
}
2312

2313
// Group Skyhooks by what node they target
2314
func groupSkyhooksByNode(clusterState *clusterState) (map[types.UID][]SkyhookNodes, map[types.UID]*corev1.Node) {
8✔
2315
        node_to_skyhooks := make(map[types.UID][]SkyhookNodes)
8✔
2316
        nodes := make(map[types.UID]*corev1.Node)
8✔
2317
        for _, skyhook := range clusterState.skyhooks {
16✔
2318
                // Ignore skyhooks that don't have runtime required
8✔
2319
                if !skyhook.GetSkyhook().Spec.RuntimeRequired {
16✔
2320
                        continue
8✔
2321
                }
2322
                for _, node := range skyhook.GetNodes() {
12✔
2323
                        if _, ok := node_to_skyhooks[node.GetNode().UID]; !ok {
12✔
2324
                                node_to_skyhooks[node.GetNode().UID] = make([]SkyhookNodes, 0)
6✔
2325
                                nodes[node.GetNode().UID] = node.GetNode()
6✔
2326
                        }
6✔
2327
                        node_to_skyhooks[node.GetNode().UID] = append(node_to_skyhooks[node.GetNode().UID], skyhook)
6✔
2328
                }
2329

2330
        }
2331
        return node_to_skyhooks, nodes
8✔
2332
}
2333

2334
// Get the nodes to remove runtime required taint from node that all skyhooks targeting that node have completed
2335
// Note: This checks per-node completion, not skyhook-level completion. A node's taint is removed when all
2336
// runtime-required skyhooks are complete ON THAT SPECIFIC NODE, regardless of other nodes' completion status.
2337
func getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks map[types.UID][]SkyhookNodes, nodes map[types.UID]*corev1.Node) []*corev1.Node {
8✔
2338
        to_remove := make([]*corev1.Node, 0)
8✔
2339
        for node_uid, skyhooks := range node_to_skyhooks {
14✔
2340
                node := nodes[node_uid]
6✔
2341
                all_complete := true
6✔
2342
                for _, skyhook := range skyhooks {
12✔
2343
                        // Check if THIS specific node is complete for this skyhook (not all nodes)
6✔
2344
                        _, nodeWrapper := skyhook.GetNode(node.Name)
6✔
2345
                        if nodeWrapper == nil || !nodeWrapper.IsComplete() {
12✔
2346
                                all_complete = false
6✔
2347
                                break
6✔
2348
                        }
2349
                }
2350
                if all_complete {
12✔
2351
                        to_remove = append(to_remove, node)
6✔
2352
                }
6✔
2353
        }
2354
        return to_remove
8✔
2355
}
2356

2357
// setPodResources sets resources for all containers and init containers in the pod if override is set, else leaves empty for LimitRange
2358
func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) {
8✔
2359
        if res == nil {
16✔
2360
                return
8✔
2361
        }
8✔
2362
        if !res.CPURequest.IsZero() || !res.CPULimit.IsZero() || !res.MemoryRequest.IsZero() || !res.MemoryLimit.IsZero() {
12✔
2363
                for i := range pod.Spec.InitContainers {
12✔
2364
                        pod.Spec.InitContainers[i].Resources = corev1.ResourceRequirements{
6✔
2365
                                Limits: corev1.ResourceList{
6✔
2366
                                        corev1.ResourceCPU:    res.CPULimit,
6✔
2367
                                        corev1.ResourceMemory: res.MemoryLimit,
6✔
2368
                                },
6✔
2369
                                Requests: corev1.ResourceList{
6✔
2370
                                        corev1.ResourceCPU:    res.CPURequest,
6✔
2371
                                        corev1.ResourceMemory: res.MemoryRequest,
6✔
2372
                                },
6✔
2373
                        }
6✔
2374
                }
6✔
2375
        }
2376
}
2377

2378
// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies.
2379
func partitionNodesIntoCompartments(clusterState *clusterState) error {
8✔
2380
        for _, skyhook := range clusterState.skyhooks {
16✔
2381
                // Skip skyhooks without a deployment policy (they use the default compartment created in BuildState)
8✔
2382
                if skyhook.GetSkyhook().Spec.DeploymentPolicy == "" {
16✔
2383
                        continue
8✔
2384
                }
2385

2386
                // Skip if no compartments exist (e.g., deployment policy not found)
2387
                // The webhook should prevent this at admission time, and the controller sets a condition at runtime,
2388
                // but we guard here to prevent panics if the policy goes missing
2389
                if len(skyhook.GetCompartments()) == 0 {
4✔
2390
                        continue
1✔
2391
                }
2392

2393
                // Clear all compartments before reassigning nodes to prevent stale nodes
2394
                // This ensures nodes are only in their current compartment based on current labels
2395
                for _, compartment := range skyhook.GetCompartments() {
6✔
2396
                        compartment.ClearNodes()
3✔
2397
                }
3✔
2398

2399
                for _, node := range skyhook.GetNodes() {
6✔
2400
                        compartmentName, err := skyhook.AssignNodeToCompartment(node)
3✔
2401
                        if err != nil {
3✔
2402
                                return fmt.Errorf("error assigning node %s: %w", node.GetNode().Name, err)
×
2403
                        }
×
2404
                        if err := skyhook.AddCompartmentNode(compartmentName, node); err != nil {
3✔
2405
                                return fmt.Errorf("error adding node %s to compartment %s: %w", node.GetNode().Name, compartmentName, err)
×
2406
                        }
×
2407
                }
2408
        }
2409

2410
        return nil
8✔
2411
}
2412

2413
// validateAndUpsertSkyhookData performs validation and configmap operations for a skyhook
2414
func (r *SkyhookReconciler) validateAndUpsertSkyhookData(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, ctrl.Result, error) {
7✔
2415
        if yes, result, err := shouldReturn(r.ValidateRunningPackages(ctx, skyhook)); yes {
13✔
2416
                return yes, result, err
6✔
2417
        }
6✔
2418

2419
        if yes, result, err := shouldReturn(r.ValidateNodeConfigmaps(ctx, skyhook.GetSkyhook().Name, skyhook.GetNodes())); yes {
12✔
2420
                return yes, result, err
5✔
2421
        }
5✔
2422

2423
        if yes, result, err := shouldReturn(r.UpsertConfigmaps(ctx, skyhook, clusterState)); yes {
12✔
2424
                return yes, result, err
5✔
2425
        }
5✔
2426

2427
        return false, ctrl.Result{}, nil
7✔
2428
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc