• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NVIDIA / skyhook / 22926670037

10 Mar 2026 10:12PM UTC coverage: 80.849% (-0.02%) from 80.873%
22926670037

push

github

lockwobr
chore: update chart versions

6894 of 8527 relevant lines covered (80.85%)

3.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.83
/operator/internal/controller/skyhook_controller.go
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
 * SPDX-License-Identifier: Apache-2.0
4
 *
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
package controller
20

21
import (
22
        "cmp"
23
        "context"
24
        "crypto/sha256"
25
        "encoding/hex"
26
        "encoding/json"
27
        "errors"
28
        "fmt"
29
        "reflect"
30
        "slices"
31
        "sort"
32
        "strings"
33
        "time"
34

35
        "github.com/NVIDIA/skyhook/operator/api/v1alpha1"
36
        "github.com/NVIDIA/skyhook/operator/internal/dal"
37
        "github.com/NVIDIA/skyhook/operator/internal/version"
38
        "github.com/NVIDIA/skyhook/operator/internal/wrapper"
39
        "github.com/go-logr/logr"
40

41
        corev1 "k8s.io/api/core/v1"
42
        policyv1 "k8s.io/api/policy/v1"
43
        apierrors "k8s.io/apimachinery/pkg/api/errors"
44
        "k8s.io/apimachinery/pkg/api/resource"
45
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46
        "k8s.io/apimachinery/pkg/runtime"
47
        "k8s.io/apimachinery/pkg/types"
48
        utilerrors "k8s.io/apimachinery/pkg/util/errors"
49
        "k8s.io/client-go/tools/record"
50
        "k8s.io/kubernetes/pkg/util/taints"
51
        ctrl "sigs.k8s.io/controller-runtime"
52
        "sigs.k8s.io/controller-runtime/pkg/client"
53
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/log"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
)
58

59
const (
60
        EventsReasonSkyhookApply       = "Apply"
61
        EventsReasonSkyhookInterrupt   = "Interrupt"
62
        EventsReasonSkyhookDrain       = "Drain"
63
        EventsReasonSkyhookStateChange = "State"
64
        EventsReasonNodeReboot         = "Reboot"
65
        EventTypeNormal                = "Normal"
66
        // EventTypeWarning = "Warning"
67
        TaintUnschedulable     = corev1.TaintNodeUnschedulable
68
        InterruptContainerName = "interrupt"
69

70
        SkyhookFinalizer = "skyhook.nvidia.com/skyhook"
71
)
72

73
type SkyhookOperatorOptions struct {
74
        Namespace            string        `env:"NAMESPACE, default=skyhook"`
75
        MaxInterval          time.Duration `env:"DEFAULT_INTERVAL, default=10m"`
76
        ImagePullSecret      string        `env:"IMAGE_PULL_SECRET"`
77
        CopyDirRoot          string        `env:"COPY_DIR_ROOT, default=/var/lib/skyhook"`
78
        ReapplyOnReboot      bool          `env:"REAPPLY_ON_REBOOT, default=false"`
79
        RuntimeRequiredTaint string        `env:"RUNTIME_REQUIRED_TAINT, default=skyhook.nvidia.com=runtime-required:NoSchedule"`
80
        PauseImage           string        `env:"PAUSE_IMAGE, default=registry.k8s.io/pause:3.10"`
81
        AgentImage           string        `env:"AGENT_IMAGE, default=ghcr.io/nvidia/skyhook/agent:latest"` // TODO: this needs to be updated with a working default
82
        AgentLogRoot         string        `env:"AGENT_LOG_ROOT, default=/var/log/skyhook"`
83
}
84

85
func (o *SkyhookOperatorOptions) Validate() error {
7✔
86

7✔
87
        messages := make([]string, 0)
7✔
88
        if o.Namespace == "" {
7✔
89
                messages = append(messages, "namespace must be set")
×
90
        }
×
91
        if o.CopyDirRoot == "" {
7✔
92
                messages = append(messages, "copy dir root must be set")
×
93
        }
×
94
        if o.RuntimeRequiredTaint == "" {
7✔
95
                messages = append(messages, "runtime required taint must be set")
×
96
        }
×
97
        if o.MaxInterval < time.Minute {
8✔
98
                messages = append(messages, "max interval must be at least 1 minute")
1✔
99
        }
1✔
100

101
        // CopyDirRoot must start with /
102
        if !strings.HasPrefix(o.CopyDirRoot, "/") {
8✔
103
                messages = append(messages, "copy dir root must start with /")
1✔
104
        }
1✔
105

106
        // RuntimeRequiredTaint must be parsable and must not be a deletion
107
        _, delete, err := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
7✔
108
        if err != nil {
8✔
109
                messages = append(messages, fmt.Sprintf("runtime required taint is invalid: %s", err.Error()))
1✔
110
        }
1✔
111
        if len(delete) > 0 {
8✔
112
                messages = append(messages, "runtime required taint must not be a deletion")
1✔
113
        }
1✔
114

115
        if o.AgentImage == "" {
8✔
116
                messages = append(messages, "agent image must be set")
1✔
117
        }
1✔
118

119
        if !strings.Contains(o.AgentImage, ":") {
8✔
120
                messages = append(messages, "agent image must contain a tag")
1✔
121
        }
1✔
122

123
        if o.PauseImage == "" {
8✔
124
                messages = append(messages, "pause image must be set")
1✔
125
        }
1✔
126

127
        if !strings.Contains(o.PauseImage, ":") {
8✔
128
                messages = append(messages, "pause image must contain a tag")
1✔
129
        }
1✔
130

131
        if len(messages) > 0 {
8✔
132
                return errors.New(strings.Join(messages, ", "))
1✔
133
        }
1✔
134

135
        return nil
7✔
136
}
137

138
// AgentVersion returns the image tag portion of AgentImage
139
func (o *SkyhookOperatorOptions) AgentVersion() string {
7✔
140
        parts := strings.Split(o.AgentImage, ":")
7✔
141
        return parts[len(parts)-1]
7✔
142
}
7✔
143

144
func (o *SkyhookOperatorOptions) GetRuntimeRequiredTaint() corev1.Taint {
7✔
145
        to_add, _, _ := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
7✔
146
        return to_add[0]
7✔
147
}
7✔
148

149
func (o *SkyhookOperatorOptions) GetRuntimeRequiredToleration() corev1.Toleration {
7✔
150
        taint := o.GetRuntimeRequiredTaint()
7✔
151
        return corev1.Toleration{
7✔
152
                Key:      taint.Key,
7✔
153
                Operator: corev1.TolerationOpEqual,
7✔
154
                Value:    taint.Value,
7✔
155
                Effect:   taint.Effect,
7✔
156
        }
7✔
157
}
7✔
158

159
// force type checking against this interface
160
var _ reconcile.Reconciler = &SkyhookReconciler{}
161

162
func NewSkyhookReconciler(schema *runtime.Scheme, c client.Client, recorder record.EventRecorder, opts SkyhookOperatorOptions) (*SkyhookReconciler, error) {
7✔
163

7✔
164
        err := opts.Validate()
7✔
165
        if err != nil {
7✔
166
                return nil, fmt.Errorf("invalid skyhook operator options: %w", err)
×
167
        }
×
168

169
        return &SkyhookReconciler{
7✔
170
                Client:   c,
7✔
171
                scheme:   schema,
7✔
172
                recorder: recorder,
7✔
173
                opts:     opts,
7✔
174
                dal:      dal.New(c),
7✔
175
        }, nil
7✔
176
}
177

178
// SkyhookReconciler reconciles a Skyhook object
179
type SkyhookReconciler struct {
180
        client.Client
181
        scheme   *runtime.Scheme
182
        recorder record.EventRecorder
183
        opts     SkyhookOperatorOptions
184
        dal      dal.DAL
185
}
186

187
// SetupWithManager sets up the controller with the Manager.
188
func (r *SkyhookReconciler) SetupWithManager(mgr ctrl.Manager) error {
7✔
189

7✔
190
        // indexes allow for query on fields to use the local cache
7✔
191
        indexer := mgr.GetFieldIndexer()
7✔
192
        err := indexer.
7✔
193
                IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
13✔
194
                        pod, ok := o.(*corev1.Pod)
6✔
195
                        if !ok {
6✔
196
                                return nil
×
197
                        }
×
198
                        return []string{pod.Spec.NodeName}
6✔
199
                })
200

201
        if err != nil {
7✔
202
                return err
×
203
        }
×
204

205
        ehandler := &eventHandler{
7✔
206
                logger: mgr.GetLogger(),
7✔
207
                dal:    dal.New(r.Client),
7✔
208
        }
7✔
209

7✔
210
        return ctrl.NewControllerManagedBy(mgr).
7✔
211
                For(&v1alpha1.Skyhook{}).
7✔
212
                Watches(
7✔
213
                        &corev1.Pod{},
7✔
214
                        handler.EnqueueRequestsFromMapFunc(podHandlerFunc),
7✔
215
                ).
7✔
216
                Watches(
7✔
217
                        &corev1.Node{},
7✔
218
                        ehandler,
7✔
219
                ).
7✔
220
                Complete(r)
7✔
221
}
222

223
// CRD Permissions
224
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks,verbs=get;list;watch;create;update;patch;delete
225
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/status,verbs=get;update;patch
226
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/finalizers,verbs=update
227
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=deploymentpolicies,verbs=get;list;watch
228

229
// core permissions
230
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;update;patch;watch
231
//+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
232
//+kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create
233
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
234
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
235

236
// Reconcile is part of the main kubernetes reconciliation loop which aims to
237
// move the current state of the cluster closer to the desired state.
238
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile
239
func (r *SkyhookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
7✔
240
        logger := log.FromContext(ctx)
7✔
241
        // split off requests for pods
7✔
242
        if strings.HasPrefix(req.Name, "pod---") {
13✔
243
                name := strings.Split(req.Name, "pod---")[1]
6✔
244
                pod, err := r.dal.GetPod(ctx, req.Namespace, name)
6✔
245
                if err == nil && pod != nil { // if pod, then call other wise not a pod
12✔
246
                        return r.PodReconcile(ctx, pod)
6✔
247
                }
6✔
248
                return ctrl.Result{}, err
6✔
249
        }
250

251
        // get all skyhooks (SCR)
252
        skyhooks, err := r.dal.GetSkyhooks(ctx)
7✔
253
        if err != nil {
7✔
254
                // error, going to requeue and backoff
×
255
                logger.Error(err, "error getting skyhooks")
×
256
                return ctrl.Result{}, err
×
257
        }
×
258

259
        // if there are no skyhooks, so actually nothing to do, so don't requeue
260
        if skyhooks == nil || len(skyhooks.Items) == 0 {
14✔
261
                return ctrl.Result{}, nil
7✔
262
        }
7✔
263

264
        // get all nodes
265
        nodes, err := r.dal.GetNodes(ctx)
6✔
266
        if err != nil {
6✔
267
                // error, going to requeue and backoff
×
268
                logger.Error(err, "error getting nodes")
×
269
                return ctrl.Result{}, err
×
270
        }
×
271

272
        // if no nodes, well not work to do either
273
        if nodes == nil || len(nodes.Items) == 0 {
6✔
274
                // no nodes, so nothing to do
×
275
                return ctrl.Result{}, nil
×
276
        }
×
277

278
        // get all deployment policies
279
        deploymentPolicies, err := r.dal.GetDeploymentPolicies(ctx)
6✔
280
        if err != nil {
6✔
281
                logger.Error(err, "error getting deployment policies")
×
282
                return ctrl.Result{}, err
×
283
        }
×
284

285
        // TODO: this build state could error in a lot of ways, and I think we might want to move towards partial state
286
        // mean if we cant get on SCR state, great, process that one and error
287

288
        // BUILD cluster state from all skyhooks, and all nodes
289
        // this filters and pairs up nodes to skyhooks, also provides help methods for introspection and mutation
290
        clusterState, err := BuildState(skyhooks, nodes, deploymentPolicies)
6✔
291
        if err != nil {
6✔
292
                // error, going to requeue and backoff
×
293
                logger.Error(err, "error building cluster state")
×
294
                return ctrl.Result{}, err
×
295
        }
×
296

297
        // handle auto-tainting new nodes first so it
298
        if yes, result, err := shouldReturn(r.HandleAutoTaint(ctx, clusterState)); yes {
10✔
299
                return result, err
4✔
300
        }
4✔
301

302
        if yes, result, err := shouldReturn(r.HandleMigrations(ctx, clusterState)); yes {
12✔
303
                return result, err
6✔
304
        }
6✔
305

306
        if yes, result, err := shouldReturn(r.TrackReboots(ctx, clusterState)); yes {
12✔
307
                return result, err
6✔
308
        }
6✔
309

310
        // node picker is for selecting nodes to do work, tries maintain a prior of nodes between SCRs
311
        nodePicker := NewNodePicker(logger, r.opts.GetRuntimeRequiredToleration())
6✔
312

6✔
313
        errs := make([]error, 0)
6✔
314
        var result *ctrl.Result
6✔
315

6✔
316
        for _, skyhook := range clusterState.skyhooks {
12✔
317
                if yes, result, err := shouldReturn(r.HandleFinalizer(ctx, skyhook)); yes {
12✔
318
                        return result, err
6✔
319
                }
6✔
320

321
                if yes, result, err := shouldReturn(r.ReportState(ctx, clusterState, skyhook)); yes {
12✔
322
                        return result, err
6✔
323
                }
6✔
324

325
                if skyhook.IsPaused() {
11✔
326
                        if yes, result, err := shouldReturn(r.UpdatePauseStatus(ctx, clusterState, skyhook)); yes {
10✔
327
                                return result, err
5✔
328
                        }
5✔
329
                        continue
5✔
330
                }
331

332
                if yes, result, err := r.validateAndUpsertSkyhookData(ctx, skyhook, clusterState); yes {
11✔
333
                        return result, err
5✔
334
                }
5✔
335

336
                changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
6✔
337
                if changed {
12✔
338
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
6✔
339
                        if len(errs) > 0 {
10✔
340
                                return ctrl.Result{RequeueAfter: time.Second * 2}, utilerrors.NewAggregate(errs)
4✔
341
                        }
4✔
342
                        return ctrl.Result{RequeueAfter: time.Second * 2}, nil
6✔
343
                }
344

345
                _, err := HandleVersionChange(skyhook)
6✔
346
                if err != nil {
6✔
347
                        return ctrl.Result{RequeueAfter: time.Second * 2}, fmt.Errorf("error getting packages to uninstall: %w", err)
×
348
                }
×
349
        }
350

351
        // Process all non-complete, non-disabled skyhooks (in priority order)
352
        // Each skyhook is processed only for nodes that are ready (all higher-priority skyhooks complete on that node)
353
        // This enables per-node priority ordering: nodes can progress independently
354
        result, err = r.processSkyhooksPerNode(ctx, clusterState, nodePicker, logger)
6✔
355
        if err != nil {
10✔
356
                errs = append(errs, err)
4✔
357
        }
4✔
358

359
        err = r.HandleRuntimeRequired(ctx, clusterState)
6✔
360
        if err != nil {
6✔
361
                errs = append(errs, err)
×
362
        }
×
363

364
        if len(errs) > 0 {
10✔
365
                err := utilerrors.NewAggregate(errs)
4✔
366
                return ctrl.Result{}, err
4✔
367
        }
4✔
368

369
        if result != nil {
12✔
370
                return *result, nil
6✔
371
        }
6✔
372

373
        // default happy retry after max
374
        return ctrl.Result{RequeueAfter: r.opts.MaxInterval}, nil
6✔
375
}
376

377
// processSkyhooksPerNode processes all skyhooks for nodes that are ready (per-node priority ordering).
378
// A node is ready for a skyhook if all higher-priority skyhooks are complete on that specific node.
379
func (r *SkyhookReconciler) processSkyhooksPerNode(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, logger logr.Logger) (*ctrl.Result, error) {
6✔
380
        var result *ctrl.Result
6✔
381
        var errs []error
6✔
382

6✔
383
        for _, skyhook := range clusterState.skyhooks {
12✔
384
                if skyhook.IsComplete() || skyhook.IsDisabled() || skyhook.IsPaused() {
12✔
385
                        continue
6✔
386
                }
387

388
                // Check if any nodes are ready for this skyhook
389
                if !hasReadyNodesForSkyhook(skyhook, clusterState.skyhooks) {
10✔
390
                        continue
4✔
391
                }
392

393
                res, err := r.RunSkyhookPackages(ctx, clusterState, nodePicker, skyhook)
6✔
394
                if err != nil {
10✔
395
                        logger.Error(err, "error processing skyhook", "skyhook", skyhook.GetSkyhook().Name)
4✔
396
                        errs = append(errs, err)
4✔
397
                }
4✔
398
                if res != nil {
12✔
399
                        result = res
6✔
400
                }
6✔
401
        }
402

403
        if len(errs) > 0 {
10✔
404
                return result, utilerrors.NewAggregate(errs)
4✔
405
        }
4✔
406
        return result, nil
6✔
407
}
408

409
// hasReadyNodesForSkyhook checks if any nodes are ready to process this skyhook.
410
// A node is ready if it's not complete and all higher-priority skyhooks are complete on that node.
411
func hasReadyNodesForSkyhook(skyhook SkyhookNodes, allSkyhooks []SkyhookNodes) bool {
6✔
412
        for _, node := range skyhook.GetNodes() {
12✔
413
                if !node.IsComplete() && IsNodeReadyForSkyhook(node.GetNode().Name, skyhook, allSkyhooks) {
12✔
414
                        return true
6✔
415
                }
6✔
416
        }
417
        return false
4✔
418
}
419

420
func shouldReturn(updates bool, err error) (bool, ctrl.Result, error) {
6✔
421
        if err != nil {
12✔
422
                return true, ctrl.Result{}, err
6✔
423
        }
6✔
424
        if updates {
12✔
425
                return true, ctrl.Result{RequeueAfter: time.Second * 2}, nil
6✔
426
        }
6✔
427
        return false, ctrl.Result{}, nil
6✔
428
}
429

430
func (r *SkyhookReconciler) HandleMigrations(ctx context.Context, clusterState *clusterState) (bool, error) {
6✔
431

6✔
432
        updates := false
6✔
433

6✔
434
        if version.VERSION == "" {
6✔
435
                // this means the binary was complied without version information
×
436
                return false, nil
×
437
        }
×
438

439
        logger := log.FromContext(ctx)
6✔
440
        errors := make([]error, 0)
6✔
441
        for _, skyhook := range clusterState.skyhooks {
12✔
442

6✔
443
                err := skyhook.Migrate(logger)
6✔
444
                if err != nil {
6✔
445
                        return false, fmt.Errorf("error migrating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
446
                }
×
447

448
                if err := skyhook.GetSkyhook().Skyhook.Validate(); err != nil {
6✔
449
                        return false, fmt.Errorf("error validating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
450
                }
×
451

452
                for _, node := range skyhook.GetNodes() {
12✔
453
                        if node.Changed() {
12✔
454
                                err := r.Status().Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
6✔
455
                                if err != nil {
6✔
456
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
457
                                }
×
458

459
                                err = r.Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
6✔
460
                                if err != nil {
7✔
461
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
1✔
462
                                }
1✔
463
                                updates = true
6✔
464
                        }
465
                }
466

467
                if skyhook.GetSkyhook().Updated {
12✔
468
                        // need to do this because SaveNodesAndSkyhook only saves skyhook status, not the main skyhook object where the annotations are
6✔
469
                        // additionally it needs to be an update, a patch nils out the annotations for some reason, which the save function does a patch
6✔
470

6✔
471
                        if err = r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
12✔
472
                                return false, fmt.Errorf("error updating during migration skyhook status [%s]: %w", skyhook.GetSkyhook().Name, err)
6✔
473
                        }
6✔
474

475
                        // because of conflict issues (409) we need to do things a bit differently here.
476
                        // We might be able to use server side apply in the future, but for now we need to do this
477
                        // https://kubernetes.io/docs/reference/using-api/server-side-apply/
478
                        // https://github.com/kubernetes-sigs/controller-runtime/issues/347
479

480
                        // work around for now is to grab a new copy of the object, and then patch it
481

482
                        newskyhook, err := r.dal.GetSkyhook(ctx, skyhook.GetSkyhook().Name)
6✔
483
                        if err != nil {
6✔
484
                                return false, fmt.Errorf("error getting skyhook to migrate [%s]: %w", skyhook.GetSkyhook().Name, err)
×
485
                        }
×
486
                        newPatch := client.MergeFrom(newskyhook.DeepCopy())
6✔
487

6✔
488
                        // set version
6✔
489
                        wrapper.NewSkyhookWrapper(newskyhook).SetVersion()
6✔
490

6✔
491
                        if err = r.Patch(ctx, newskyhook, newPatch); err != nil {
6✔
492
                                return false, fmt.Errorf("error updating during migration skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
493
                        }
×
494

495
                        updates = true
6✔
496
                }
497
        }
498

499
        if len(errors) > 0 {
7✔
500
                return false, utilerrors.NewAggregate(errors)
1✔
501
        }
1✔
502

503
        return updates, nil
6✔
504
}
505

506
// ReportState computes and puts important information into the skyhook status so that monitoring tools such as k9s
507
// can see the information at a glance. For example, the number of completed nodes and the list of packages in the skyhook.
508
func (r *SkyhookReconciler) ReportState(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
6✔
509

6✔
510
        // save updated state to skyhook status
6✔
511
        skyhook.ReportState()
6✔
512

6✔
513
        if skyhook.GetSkyhook().Updated {
12✔
514
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
6✔
515
                if len(errs) > 0 {
6✔
516
                        return false, utilerrors.NewAggregate(errs)
×
517
                }
×
518
                return true, nil
6✔
519
        }
520

521
        return false, nil
6✔
522
}
523

524
func (r *SkyhookReconciler) UpdatePauseStatus(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
5✔
525
        changed := UpdateSkyhookPauseStatus(skyhook)
5✔
526

5✔
527
        if changed {
10✔
528
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
5✔
529
                if len(errs) > 0 {
5✔
530
                        return false, utilerrors.NewAggregate(errs)
×
531
                }
×
532
                return true, nil
5✔
533
        }
534

535
        return false, nil
5✔
536
}
537

538
func (r *SkyhookReconciler) TrackReboots(ctx context.Context, clusterState *clusterState) (bool, error) {
6✔
539

6✔
540
        updates := false
6✔
541
        errs := make([]error, 0)
6✔
542

6✔
543
        for _, skyhook := range clusterState.skyhooks {
12✔
544
                if skyhook.GetSkyhook().Status.NodeBootIds == nil {
12✔
545
                        skyhook.GetSkyhook().Status.NodeBootIds = make(map[string]string)
6✔
546
                }
6✔
547

548
                for _, node := range skyhook.GetNodes() {
12✔
549
                        id, ok := skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name]
6✔
550

6✔
551
                        if !ok { // new node
12✔
552
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
6✔
553
                                skyhook.GetSkyhook().Updated = true
6✔
554
                        }
6✔
555

556
                        if id != "" && id != node.GetNode().Status.NodeInfo.BootID { // node rebooted
6✔
557
                                if r.opts.ReapplyOnReboot {
×
558
                                        r.recorder.Eventf(skyhook.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node [%s] to be reapplied", node.GetNode().Name)
×
559
                                        r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node for [%s] to be reapplied", node.GetSkyhook().Name)
×
560
                                        node.Reset()
×
561
                                }
×
562
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
×
563
                                skyhook.GetSkyhook().Updated = true
×
564
                        }
565

566
                        if node.Changed() { // update
6✔
567
                                updates = true
×
568
                                err := r.Update(ctx, node.GetNode())
×
569
                                if err != nil {
×
570
                                        errs = append(errs, fmt.Errorf("error updating node after reboot [%s]: %w", node.GetNode().Name, err))
×
571
                                }
×
572
                        }
573
                }
574
                if skyhook.GetSkyhook().Updated { // update
12✔
575
                        updates = true
6✔
576
                        err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook)
6✔
577
                        if err != nil {
11✔
578
                                errs = append(errs, fmt.Errorf("error updating skyhook status after reboot [%s]: %w", skyhook.GetSkyhook().Name, err))
5✔
579
                        }
5✔
580
                }
581
        }
582

583
        return updates, utilerrors.NewAggregate(errs)
6✔
584
}
585

586
// RunSkyhookPackages runs all skyhook packages then saves and requeues if changes were made
587
func (r *SkyhookReconciler) RunSkyhookPackages(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, skyhook SkyhookNodes) (*ctrl.Result, error) {
6✔
588

6✔
589
        logger := log.FromContext(ctx)
6✔
590
        requeue := false
6✔
591

6✔
592
        toUninstall, err := HandleVersionChange(skyhook)
6✔
593
        if err != nil {
6✔
594
                return nil, fmt.Errorf("error getting packages to uninstall: %w", err)
×
595
        }
×
596

597
        changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
6✔
598
        if !changed && skyhook.IsComplete() {
6✔
599
                return nil, nil
×
600
        }
×
601

602
        selectedNode := nodePicker.SelectNodes(skyhook)
6✔
603

6✔
604
        for _, node := range selectedNode {
12✔
605
                // Skip nodes that are waiting on higher-priority skyhooks
6✔
606
                // This enables per-node priority ordering
6✔
607
                if !IsNodeReadyForSkyhook(node.GetNode().Name, skyhook, clusterState.skyhooks) {
10✔
608
                        continue
4✔
609
                }
610

611
                if node.IsComplete() && !node.Changed() {
6✔
612
                        continue
×
613
                }
614

615
                toRun, err := node.RunNext()
6✔
616
                if err != nil {
6✔
617
                        return nil, fmt.Errorf("error getting next packages to run: %w", err)
×
618
                }
×
619

620
                // prepend the uninstall packages so they are ran first
621
                toRun = append(toUninstall, toRun...)
6✔
622

6✔
623
                interrupt, pack := fudgeInterruptWithPriority(toRun, skyhook.GetSkyhook().GetConfigUpdates(), skyhook.GetSkyhook().GetConfigInterrupts())
6✔
624

6✔
625
                for _, f := range toRun {
12✔
626

6✔
627
                        ok, err := r.ProcessInterrupt(ctx, node, f, interrupt, interrupt != nil && f.Name == pack)
6✔
628
                        if err != nil {
6✔
629
                                // TODO: error handle
×
630
                                return nil, fmt.Errorf("error processing if we should interrupt [%s:%s]: %w", f.Name, f.Version, err)
×
631
                        }
×
632
                        if !ok {
10✔
633
                                requeue = true
4✔
634
                                continue
4✔
635
                        }
636

637
                        err = r.ApplyPackage(ctx, logger, clusterState, node, f, interrupt != nil && f.Name == pack)
6✔
638
                        if err != nil {
6✔
639
                                return nil, fmt.Errorf("error applying package [%s:%s]: %w", f.Name, f.Version, err)
×
640
                        }
×
641

642
                        // process one package at a time
643
                        if skyhook.GetSkyhook().Spec.Serial {
6✔
644
                                return &ctrl.Result{Requeue: true}, nil
×
645
                        }
×
646
                }
647
        }
648

649
        saved, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
6✔
650
        if len(errs) > 0 {
10✔
651
                return &ctrl.Result{}, utilerrors.NewAggregate(errs)
4✔
652
        }
4✔
653
        if saved {
12✔
654
                requeue = true
6✔
655
        }
6✔
656

657
        if !skyhook.IsComplete() || requeue {
12✔
658
                return &ctrl.Result{RequeueAfter: time.Second * 2}, nil // not sure this is better then just requeue bool
6✔
659
        }
6✔
660

661
        return nil, utilerrors.NewAggregate(errs)
×
662
}
663

664
// SaveNodesAndSkyhook saves nodes and skyhook and will update the events if the skyhook status changes
665
func (r *SkyhookReconciler) SaveNodesAndSkyhook(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, []error) {
6✔
666
        saved := false
6✔
667
        errs := make([]error, 0)
6✔
668

6✔
669
        for _, node := range skyhook.GetNodes() {
12✔
670
                patch := client.StrategicMergeFrom(clusterState.tracker.GetOriginal(node.GetNode()))
6✔
671
                if node.Changed() {
12✔
672
                        err := r.Patch(ctx, node.GetNode(), patch)
6✔
673
                        if err != nil {
6✔
674
                                errs = append(errs, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
675
                        }
×
676
                        saved = true
6✔
677

6✔
678
                        err = r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhook.GetSkyhook(), node.GetNode())
6✔
679
                        if err != nil {
6✔
680
                                errs = append(errs, fmt.Errorf("error upserting labels, annotations, and packages config map for node [%s]: %w", node.GetNode().Name, err))
×
681
                        }
×
682

683
                        if node.IsComplete() {
12✔
684
                                r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook [%s] complete.", skyhook.GetSkyhook().Name)
6✔
685

6✔
686
                                // since node is complete remove from priority
6✔
687
                                if _, ok := skyhook.GetSkyhook().Status.NodePriority[node.GetNode().Name]; ok {
12✔
688
                                        delete(skyhook.GetSkyhook().Status.NodePriority, node.GetNode().Name)
6✔
689
                                        skyhook.GetSkyhook().Updated = true
6✔
690
                                }
6✔
691
                        }
692
                }
693

694
                // updates node's condition
695
                node.UpdateCondition()
6✔
696
                if node.Changed() {
12✔
697
                        // conditions are in status
6✔
698
                        err := r.Status().Patch(ctx, node.GetNode(), patch)
6✔
699
                        if err != nil {
10✔
700
                                errs = append(errs, fmt.Errorf("error patching node status [%s]: %w", node.GetNode().Name, err))
4✔
701
                        }
4✔
702
                        saved = true
6✔
703
                }
704

705
                if node.GetSkyhook() != nil && node.GetSkyhook().Updated {
12✔
706
                        skyhook.GetSkyhook().Updated = true
6✔
707
                }
6✔
708
        }
709

710
        if skyhook.GetSkyhook().Updated {
12✔
711
                patch := client.MergeFrom(clusterState.tracker.GetOriginal(skyhook.GetSkyhook().Skyhook))
6✔
712
                err := r.Status().Patch(ctx, skyhook.GetSkyhook().Skyhook, patch)
6✔
713
                if err != nil {
6✔
714
                        errs = append(errs, err)
×
715
                }
×
716
                saved = true
6✔
717

6✔
718
                if skyhook.GetPriorStatus() != "" && skyhook.GetPriorStatus() != skyhook.Status() {
12✔
719
                        // we transitioned, fire event
6✔
720
                        r.recorder.Eventf(skyhook.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook transitioned [%s] -> [%s]", skyhook.GetPriorStatus(), skyhook.Status())
6✔
721
                }
6✔
722
        }
723

724
        if len(errs) > 0 {
10✔
725
                saved = false
4✔
726
        }
4✔
727
        return saved, errs
6✔
728
}
729

730
// HandleVersionChange updates the state for the node or skyhook if a version is changed on a package
731
func HandleVersionChange(skyhook SkyhookNodes) ([]*v1alpha1.Package, error) {
7✔
732
        toUninstall := make([]*v1alpha1.Package, 0)
7✔
733
        versionChangeDetected := false
7✔
734

7✔
735
        for _, node := range skyhook.GetNodes() {
14✔
736
                nodeState, err := node.State()
7✔
737
                if err != nil {
7✔
738
                        return nil, err
×
739
                }
×
740

741
                for _, packageStatus := range nodeState {
14✔
742
                        upgrade := false
7✔
743

7✔
744
                        _package, exists := skyhook.GetSkyhook().Spec.Packages[packageStatus.Name]
7✔
745
                        if exists && _package.Version == packageStatus.Version {
14✔
746
                                continue // no uninstall needed for package
7✔
747
                        }
748

749
                        packageStatusRef := v1alpha1.PackageRef{
5✔
750
                                Name:    packageStatus.Name,
5✔
751
                                Version: packageStatus.Version,
5✔
752
                        }
5✔
753

5✔
754
                        if !exists && packageStatus.Stage != v1alpha1.StageUninstall {
9✔
755
                                // Start uninstall of old package
4✔
756
                                err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
4✔
757
                                if err != nil {
4✔
758
                                        return nil, fmt.Errorf("error updating node status: %w", err)
×
759
                                }
×
760
                                versionChangeDetected = true
4✔
761
                        } else if exists && _package.Version != packageStatus.Version {
10✔
762
                                versionChangeDetected = true
5✔
763
                                comparison := version.Compare(_package.Version, packageStatus.Version)
5✔
764
                                if comparison == -2 {
5✔
765
                                        return nil, errors.New("error comparing package versions: invalid version string provided enabling webhooks validates versions before being applied")
×
766
                                }
×
767

768
                                if comparison == 1 {
10✔
769
                                        _packageStatus, found := node.PackageStatus(_package.GetUniqueName())
5✔
770
                                        if found && _packageStatus.Stage == v1alpha1.StageUpgrade {
10✔
771
                                                continue
5✔
772
                                        }
773

774
                                        // start upgrade of package
775
                                        err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageUpgrade, 0, _package.ContainerSHA)
4✔
776
                                        if err != nil {
4✔
777
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
778
                                        }
×
779

780
                                        upgrade = true
4✔
781
                                } else if comparison == -1 && packageStatus.Stage != v1alpha1.StageUninstall {
8✔
782
                                        // Start uninstall of old package
4✔
783
                                        err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
4✔
784
                                        if err != nil {
4✔
785
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
786
                                        }
×
787

788
                                        // If version changed then update new version to wait
789
                                        err = node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, v1alpha1.StageUninstall, 0, _package.ContainerSHA)
4✔
790
                                        if err != nil {
4✔
791
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
792
                                        }
×
793
                                }
794
                        }
795

796
                        // only need to create a feaux package for uninstall since it won't be in the DAG (Upgrade will)
797
                        newPackageStatus, found := node.PackageStatus(packageStatusRef.GetUniqueName())
4✔
798
                        if !upgrade && found && newPackageStatus.Stage == v1alpha1.StageUninstall && newPackageStatus.State == v1alpha1.StateInProgress {
8✔
799
                                // create fake package with the info we can salvage from the node state
4✔
800
                                newPackage := &v1alpha1.Package{
4✔
801
                                        PackageRef: packageStatusRef,
4✔
802
                                        Image:      packageStatus.Image,
4✔
803
                                }
4✔
804

4✔
805
                                // Add package to uninstall list if it's not already present
4✔
806
                                found := false
4✔
807
                                for _, uninstallPackage := range toUninstall {
8✔
808
                                        if reflect.DeepEqual(uninstallPackage, newPackage) {
4✔
809
                                                found = true
×
810
                                        }
×
811
                                }
812

813
                                if !found {
8✔
814
                                        toUninstall = append(toUninstall, newPackage)
4✔
815
                                }
4✔
816
                        }
817

818
                        // remove all config updates for the package since it's being uninstalled or
819
                        // upgraded. NOTE: The config updates must be removed whenever the version changes
820
                        // or else the package interrupt may be skipped if there is one
821
                        skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
4✔
822

4✔
823
                        // set the node and skyhook status to in progress
4✔
824
                        node.SetStatus(v1alpha1.StatusInProgress)
4✔
825
                }
826
        }
827

828
        // Auto-reset batch state when version changes are detected (if configured)
829
        if versionChangeDetected {
12✔
830
                resetSkyhookBatchState(skyhook)
5✔
831
        }
5✔
832

833
        return toUninstall, nil
7✔
834
}
835

836
// helper for get a point to a ref
837
func ptr[E any](e E) *E {
7✔
838
        return &e
7✔
839
}
7✔
840

841
// generateSafeName generates a consistent name for Kubernetes resources that is unique
842
// while staying within the specified character limit
843
func generateSafeName(maxLen int, nameParts ...string) string {
7✔
844
        name := strings.Join(nameParts, "-")
7✔
845
        // Replace dots with dashes as they're not allowed in resource names
7✔
846
        name = strings.ReplaceAll(name, ".", "-")
7✔
847

7✔
848
        unique := sha256.Sum256([]byte(name))
7✔
849
        uniqueStr := hex.EncodeToString(unique[:])[:8]
7✔
850

7✔
851
        maxlen := maxLen - len(uniqueStr) - 1
7✔
852
        if len(name) > maxlen {
14✔
853
                name = name[:maxlen]
7✔
854
        }
7✔
855

856
        return strings.ToLower(fmt.Sprintf("%s-%s", name, uniqueStr))
7✔
857
}
858

859
func (r *SkyhookReconciler) UpsertNodeLabelsAnnotationsPackages(ctx context.Context, skyhook *wrapper.Skyhook, node *corev1.Node) error {
7✔
860
        // No work to do if there is no labels or annotations for node
7✔
861
        if len(node.Labels) == 0 && len(node.Annotations) == 0 {
7✔
862
                return nil
×
863
        }
×
864

865
        annotations, err := json.Marshal(node.Annotations)
7✔
866
        if err != nil {
7✔
867
                return fmt.Errorf("error converting annotations into byte array: %w", err)
×
868
        }
×
869

870
        labels, err := json.Marshal(node.Labels)
7✔
871
        if err != nil {
7✔
872
                return fmt.Errorf("error converting labels into byte array: %w", err)
×
873
        }
×
874

875
        // marshal intermediary package metadata for the agent
876
        metadata := NewSkyhookMetadata(r.opts, skyhook)
7✔
877
        packages, err := metadata.Marshal()
7✔
878
        if err != nil {
7✔
879
                return fmt.Errorf("error converting packages into byte array: %w", err)
×
880
        }
×
881

882
        configMapName := generateSafeName(253, skyhook.Name, node.Name, "metadata")
7✔
883
        newCM := &corev1.ConfigMap{
7✔
884
                ObjectMeta: metav1.ObjectMeta{
7✔
885
                        Name:      configMapName,
7✔
886
                        Namespace: r.opts.Namespace,
7✔
887
                        Labels: map[string]string{
7✔
888
                                fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhook.Name,
7✔
889
                        },
7✔
890
                        Annotations: map[string]string{
7✔
891
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
7✔
892
                                fmt.Sprintf("%s/Node.name", v1alpha1.METADATA_PREFIX): node.Name,
7✔
893
                        },
7✔
894
                },
7✔
895
                Data: map[string]string{
7✔
896
                        "annotations.json": string(annotations),
7✔
897
                        "labels.json":      string(labels),
7✔
898
                        "packages.json":    string(packages),
7✔
899
                },
7✔
900
        }
7✔
901

7✔
902
        if err := ctrl.SetControllerReference(skyhook.Skyhook, newCM, r.scheme); err != nil {
7✔
903
                return fmt.Errorf("error setting ownership: %w", err)
×
904
        }
×
905

906
        existingConfigMap := &corev1.ConfigMap{}
7✔
907
        err = r.Get(ctx, client.ObjectKey{Namespace: r.opts.Namespace, Name: configMapName}, existingConfigMap)
7✔
908
        if err != nil {
14✔
909
                if apierrors.IsNotFound(err) {
14✔
910
                        // create
7✔
911
                        err := r.Create(ctx, newCM)
7✔
912
                        if err != nil {
7✔
913
                                return fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
914
                        }
×
915
                } else {
×
916
                        return fmt.Errorf("error getting config map: %w", err)
×
917
                }
×
918
        } else {
6✔
919
                if !reflect.DeepEqual(existingConfigMap.Data, newCM.Data) {
12✔
920
                        // update
6✔
921
                        err := r.Update(ctx, newCM)
6✔
922
                        if err != nil {
6✔
923
                                return fmt.Errorf("error updating config map [%s]: %w", newCM.Name, err)
×
924
                        }
×
925
                }
926
        }
927

928
        return nil
7✔
929
}
930

931
// HandleConfigUpdates checks whether the configMap on a package was updated and if it was the configmap will
932
// be updated and the package will be put into config mode if the package is complete or erroring
933
func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes, _package v1alpha1.Package, oldConfigMap, newConfigMap *corev1.ConfigMap) (bool, error) {
6✔
934
        completedNodes, nodeCount := 0, len(skyhook.GetNodes())
6✔
935
        erroringNode := false
6✔
936

6✔
937
        // if configmap changed
6✔
938
        if !reflect.DeepEqual(oldConfigMap.Data, newConfigMap.Data) {
10✔
939
                for _, node := range skyhook.GetNodes() {
8✔
940
                        exists, err := r.PodExists(ctx, node.GetNode().Name, skyhook.GetSkyhook().Name, &_package)
4✔
941
                        if err != nil {
4✔
942
                                return false, err
×
943
                        }
×
944

945
                        if !exists && node.IsPackageComplete(_package) {
8✔
946
                                completedNodes++
4✔
947
                        }
4✔
948

949
                        // if we have an erroring node in the config, interrupt, or post-interrupt mode
950
                        // then we will restart the config changes
951
                        if packageStatus, found := node.PackageStatus(_package.GetUniqueName()); found {
8✔
952
                                switch packageStatus.Stage {
4✔
953
                                case v1alpha1.StageConfig, v1alpha1.StageInterrupt, v1alpha1.StagePostInterrupt:
4✔
954
                                        if packageStatus.State == v1alpha1.StateErroring {
4✔
955
                                                erroringNode = true
×
956

×
957
                                                // delete the erroring pod from the node so that it can be recreated
×
958
                                                // with the updated configmap
×
959
                                                pods, err := r.dal.GetPods(ctx,
×
960
                                                        client.MatchingFields{
×
961
                                                                "spec.nodeName": node.GetNode().Name,
×
962
                                                        },
×
963
                                                        client.MatchingLabels{
×
964
                                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.GetSkyhook().Name,
×
965
                                                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
×
966
                                                        },
×
967
                                                )
×
968
                                                if err != nil {
×
969
                                                        return false, err
×
970
                                                }
×
971

972
                                                if pods != nil {
×
973
                                                        for _, pod := range pods.Items {
×
974
                                                                err := r.Delete(ctx, &pod)
×
975
                                                                if err != nil {
×
976
                                                                        return false, err
×
977
                                                                }
×
978
                                                        }
979
                                                }
980
                                        }
981
                                }
982
                        }
983
                }
984

985
                // if the update is complete or there is an erroring node put the package back into
986
                // the config mode and update the config map
987
                if completedNodes == nodeCount || erroringNode {
8✔
988
                        // get the keys in the configmap that changed
4✔
989
                        newConfigUpdates := make([]string, 0)
4✔
990
                        for key, new_val := range newConfigMap.Data {
8✔
991
                                if old_val, exists := oldConfigMap.Data[key]; !exists || old_val != new_val {
8✔
992
                                        newConfigUpdates = append(newConfigUpdates, key)
4✔
993
                                }
4✔
994
                        }
995

996
                        // if updates completed then clear out old config updates as they are finished
997
                        if completedNodes == nodeCount {
8✔
998
                                skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
4✔
999
                        }
4✔
1000

1001
                        // Add the new changed keys to the config updates
1002
                        skyhook.GetSkyhook().AddConfigUpdates(_package.Name, newConfigUpdates...)
4✔
1003

4✔
1004
                        for _, node := range skyhook.GetNodes() {
8✔
1005
                                err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageConfig, 0, _package.ContainerSHA)
4✔
1006
                                if err != nil {
4✔
1007
                                        return false, fmt.Errorf("error upserting node status [%s]: %w", node.GetNode().Name, err)
×
1008
                                }
×
1009

1010
                                node.SetStatus(v1alpha1.StatusInProgress)
4✔
1011
                        }
1012

1013
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
4✔
1014
                        if len(errs) > 0 {
4✔
1015
                                return false, utilerrors.NewAggregate(errs)
×
1016
                        }
×
1017

1018
                        // update config map
1019
                        err := r.Update(ctx, newConfigMap)
4✔
1020
                        if err != nil {
4✔
1021
                                return false, fmt.Errorf("error updating config map [%s]: %w", newConfigMap.Name, err)
×
1022
                        }
×
1023

1024
                        return true, nil
4✔
1025
                }
1026
        }
1027

1028
        return false, nil
6✔
1029
}
1030

1031
func (r *SkyhookReconciler) UpsertConfigmaps(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, error) {
6✔
1032
        updated := false
6✔
1033

6✔
1034
        var list corev1.ConfigMapList
6✔
1035
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name})
6✔
1036
        if err != nil {
6✔
1037
                return false, fmt.Errorf("error listing config maps while upserting: %w", err)
×
1038
        }
×
1039

1040
        existingCMs := make(map[string]corev1.ConfigMap)
6✔
1041
        for _, cm := range list.Items {
12✔
1042
                existingCMs[cm.Name] = cm
6✔
1043
        }
6✔
1044

1045
        // clean up from an update
1046
        shouldExist := make(map[string]struct{})
6✔
1047
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
12✔
1048
                shouldExist[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))] = struct{}{}
6✔
1049
        }
6✔
1050

1051
        for k, v := range existingCMs {
12✔
1052
                if _, ok := shouldExist[k]; !ok {
10✔
1053
                        // delete
4✔
1054
                        err := r.Delete(ctx, &v)
4✔
1055
                        if err != nil {
4✔
1056
                                return false, fmt.Errorf("error deleting existing config map [%s] while upserting: %w", v.Name, err)
×
1057
                        }
×
1058
                }
1059
        }
1060

1061
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
12✔
1062
                if len(_package.ConfigMap) > 0 {
12✔
1063

6✔
1064
                        newCM := &corev1.ConfigMap{
6✔
1065
                                ObjectMeta: metav1.ObjectMeta{
6✔
1066
                                        Name:      strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version)),
6✔
1067
                                        Namespace: r.opts.Namespace,
6✔
1068
                                        Labels: map[string]string{
6✔
1069
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
6✔
1070
                                        },
6✔
1071
                                        Annotations: map[string]string{
6✔
1072
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):            skyhook.GetSkyhook().Name,
6✔
1073
                                                fmt.Sprintf("%s/Package.Name", v1alpha1.METADATA_PREFIX):    _package.Name,
6✔
1074
                                                fmt.Sprintf("%s/Package.Version", v1alpha1.METADATA_PREFIX): _package.Version,
6✔
1075
                                        },
6✔
1076
                                },
6✔
1077
                                Data: _package.ConfigMap,
6✔
1078
                        }
6✔
1079
                        // set owner of CM to the SCR, which will clean up the CM in delete of the SCR
6✔
1080
                        if err := ctrl.SetControllerReference(skyhook.GetSkyhook().Skyhook, newCM, r.scheme); err != nil {
6✔
1081
                                return false, fmt.Errorf("error setting ownership of cm: %w", err)
×
1082
                        }
×
1083

1084
                        if existingCM, ok := existingCMs[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))]; ok {
12✔
1085
                                updatedConfigMap, err := r.HandleConfigUpdates(ctx, clusterState, skyhook, _package, &existingCM, newCM)
6✔
1086
                                if err != nil {
6✔
1087
                                        return false, fmt.Errorf("error updating config map [%s]: %s", newCM.Name, err)
×
1088
                                }
×
1089
                                if updatedConfigMap {
10✔
1090
                                        updated = true
4✔
1091
                                }
4✔
1092
                        } else {
6✔
1093
                                // create
6✔
1094
                                err := r.Create(ctx, newCM)
6✔
1095
                                if err != nil {
6✔
1096
                                        return false, fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
1097
                                }
×
1098
                        }
1099
                }
1100
        }
1101

1102
        return updated, nil
6✔
1103
}
1104

1105
func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
4✔
1106

4✔
1107
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
4✔
1108
                "spec.nodeName": skyhookNode.GetNode().Name,
4✔
1109
        })
4✔
1110
        if err != nil {
4✔
1111
                return false, err
×
1112
        }
×
1113

1114
        if pods == nil || len(pods.Items) == 0 {
4✔
1115
                return true, nil
×
1116
        }
×
1117

1118
        // checking for any running or pending pods with no toleration to unschedulable
1119
        // if its has an unschedulable toleration we can ignore
1120
        for _, pod := range pods.Items {
8✔
1121

4✔
1122
                if ShouldEvict(&pod) {
8✔
1123
                        return false, nil
4✔
1124
                }
4✔
1125

1126
        }
1127

1128
        return true, nil
4✔
1129
}
1130

1131
func ShouldEvict(pod *corev1.Pod) bool {
4✔
1132
        switch pod.Status.Phase {
4✔
1133
        case corev1.PodRunning, corev1.PodPending:
4✔
1134

4✔
1135
                for _, taint := range pod.Spec.Tolerations {
8✔
1136
                        switch taint.Key {
4✔
1137
                        case "node.kubernetes.io/unschedulable": // ignoring
4✔
1138
                                return false
4✔
1139
                        }
1140
                }
1141

1142
                if len(pod.ObjectMeta.OwnerReferences) > 1 {
4✔
1143
                        for _, owner := range pod.ObjectMeta.OwnerReferences {
×
1144
                                if owner.Kind == "DaemonSet" { // ignoring
×
1145
                                        return false
×
1146
                                }
×
1147
                        }
1148
                }
1149

1150
                if pod.GetNamespace() == "kube-system" {
4✔
1151
                        return false
×
1152
                }
×
1153

1154
                return true
4✔
1155
        }
1156
        return false
4✔
1157
}
1158

1159
// HandleFinalizer returns true only if we container is deleted and we handled it completely, else false
1160
func (r *SkyhookReconciler) HandleFinalizer(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
6✔
1161
        if skyhook.GetSkyhook().DeletionTimestamp.IsZero() { // if not deleted, and does not have our finalizer, add it
12✔
1162
                if !controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
12✔
1163
                        controllerutil.AddFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
6✔
1164

6✔
1165
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
6✔
1166
                                return false, fmt.Errorf("error updating skyhook to add finalizer: %w", err)
×
1167
                        }
×
1168
                }
1169
        } else { // being delete, time to handle our
6✔
1170
                if controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
12✔
1171

6✔
1172
                        errs := make([]error, 0)
6✔
1173

6✔
1174
                        // zero out all the metrics related to this skyhook both skyhook and packages
6✔
1175
                        zeroOutSkyhookMetrics(skyhook)
6✔
1176

6✔
1177
                        for _, node := range skyhook.GetNodes() {
12✔
1178
                                patch := client.StrategicMergeFrom(node.GetNode().DeepCopy())
6✔
1179

6✔
1180
                                node.Uncordon()
6✔
1181

6✔
1182
                                // if this doesn't change the node then don't patch
6✔
1183
                                if !node.Changed() {
12✔
1184
                                        continue
6✔
1185
                                }
1186

1187
                                err := r.Patch(ctx, node.GetNode(), patch)
3✔
1188
                                if err != nil {
3✔
1189
                                        errs = append(errs, fmt.Errorf("error patching node [%s] in finalizer: %w", node.GetNode().Name, err))
×
1190
                                }
×
1191
                        }
1192

1193
                        if len(errs) > 0 { // we errored, so we need to return error, otherwise we would release the skyhook when we didnt finish
6✔
1194
                                return false, utilerrors.NewAggregate(errs)
×
1195
                        }
×
1196

1197
                        controllerutil.RemoveFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
6✔
1198
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
8✔
1199
                                return false, fmt.Errorf("error updating skyhook removing finalizer: %w", err)
2✔
1200
                        }
2✔
1201
                        // should be 1, and now 2. we want to set ObservedGeneration up to not trigger an logic from this update adding the finalizer
1202
                        skyhook.GetSkyhook().Status.ObservedGeneration = skyhook.GetSkyhook().Status.ObservedGeneration + 1
6✔
1203

6✔
1204
                        if err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
12✔
1205
                                return false, fmt.Errorf("error updating skyhook status: %w", err)
6✔
1206
                        }
6✔
1207

1208
                        return true, nil
×
1209
                }
1210
        }
1211
        return false, nil
6✔
1212
}
1213

1214
// HasNonInterruptWork returns true if pods are running on the node that are either packages, or matches the SCR selector
1215
func (r *SkyhookReconciler) HasNonInterruptWork(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
4✔
1216

4✔
1217
        selector, err := metav1.LabelSelectorAsSelector(&skyhookNode.GetSkyhook().Spec.PodNonInterruptLabels)
4✔
1218
        if err != nil {
4✔
1219
                return false, fmt.Errorf("error creating selector: %w", err)
×
1220
        }
×
1221

1222
        if selector.Empty() { // when selector is empty it does not do any selecting, ie will return all pods on node.
8✔
1223
                return false, nil
4✔
1224
        }
4✔
1225

1226
        pods, err := r.dal.GetPods(ctx,
4✔
1227
                client.MatchingLabelsSelector{Selector: selector},
4✔
1228
                client.MatchingFields{
4✔
1229
                        "spec.nodeName": skyhookNode.GetNode().Name,
4✔
1230
                },
4✔
1231
        )
4✔
1232
        if err != nil {
4✔
1233
                return false, fmt.Errorf("error getting pods: %w", err)
×
1234
        }
×
1235

1236
        if pods == nil || len(pods.Items) == 0 {
8✔
1237
                return false, nil
4✔
1238
        }
4✔
1239

1240
        for _, pod := range pods.Items {
8✔
1241
                switch pod.Status.Phase {
4✔
1242
                case corev1.PodRunning, corev1.PodPending:
4✔
1243
                        return true, nil
4✔
1244
                }
1245
        }
1246

1247
        return false, nil
×
1248
}
1249

1250
func (r *SkyhookReconciler) HasRunningPackages(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
4✔
1251
        pods, err := r.dal.GetPods(ctx,
4✔
1252
                client.HasLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX)},
4✔
1253
                client.MatchingFields{
4✔
1254
                        "spec.nodeName": skyhookNode.GetNode().Name,
4✔
1255
                },
4✔
1256
        )
4✔
1257
        if err != nil {
4✔
1258
                return false, fmt.Errorf("error getting pods: %w", err)
×
1259
        }
×
1260

1261
        return pods != nil && len(pods.Items) > 0, nil
4✔
1262
}
1263

1264
func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
4✔
1265
        drained, err := r.IsDrained(ctx, skyhookNode)
4✔
1266
        if err != nil {
4✔
1267
                return false, err
×
1268
        }
×
1269
        if drained {
8✔
1270
                return true, nil
4✔
1271
        }
4✔
1272

1273
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
4✔
1274
                "spec.nodeName": skyhookNode.GetNode().Name,
4✔
1275
        })
4✔
1276
        if err != nil {
4✔
1277
                return false, err
×
1278
        }
×
1279

1280
        if pods == nil || len(pods.Items) == 0 {
4✔
1281
                return true, nil
×
1282
        }
×
1283

1284
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookInterrupt,
4✔
1285
                "draining node [%s] package [%s:%s] from [skyhook:%s]",
4✔
1286
                skyhookNode.GetNode().Name,
4✔
1287
                _package.Name,
4✔
1288
                _package.Version,
4✔
1289
                skyhookNode.GetSkyhook().Name,
4✔
1290
        )
4✔
1291

4✔
1292
        errs := make([]error, 0)
4✔
1293
        for _, pod := range pods.Items {
8✔
1294

4✔
1295
                if ShouldEvict(&pod) {
8✔
1296
                        eviction := policyv1.Eviction{}
4✔
1297
                        err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction)
4✔
1298
                        if err != nil {
4✔
1299
                                errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err))
×
1300
                        }
×
1301
                }
1302
        }
1303

1304
        return len(errs) == 0, utilerrors.NewAggregate(errs)
4✔
1305
}
1306

1307
// Interrupt should not be called unless safe to do so, IE already cordoned and drained
1308
func (r *SkyhookReconciler) Interrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, _interrupt *v1alpha1.Interrupt) error {
4✔
1309

4✔
1310
        hasPackagesRunning, err := r.HasRunningPackages(ctx, skyhookNode)
4✔
1311
        if err != nil {
4✔
1312
                return err
×
1313
        }
×
1314

1315
        if hasPackagesRunning { // keep waiting...
8✔
1316
                return nil
4✔
1317
        }
4✔
1318

1319
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
4✔
1320
        if err != nil {
4✔
1321
                return err
×
1322
        }
×
1323
        if exists {
4✔
1324
                // nothing to do here, already running
×
1325
                return nil
×
1326
        }
×
1327

1328
        // Ensure the node metadata configmap exists before creating the pod
1329
        // This prevents a race where the pod starts before its required configmap is created
1330
        if err := r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhookNode.GetSkyhook(), skyhookNode.GetNode()); err != nil {
4✔
1331
                return fmt.Errorf("error upserting node metadata configmap: %w", err)
×
1332
        }
×
1333

1334
        argEncode, err := _interrupt.ToArgs()
4✔
1335
        if err != nil {
4✔
1336
                return fmt.Errorf("error creating interrupt args: %w", err)
×
1337
        }
×
1338

1339
        pod := createInterruptPodForPackage(r.opts, _interrupt, argEncode, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name)
4✔
1340

4✔
1341
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, v1alpha1.StageInterrupt, _package); err != nil {
4✔
1342
                return fmt.Errorf("error setting package on interrupt: %w", err)
×
1343
        }
×
1344

1345
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
4✔
1346
                return fmt.Errorf("error setting ownership: %w", err)
×
1347
        }
×
1348

1349
        if err := r.Create(ctx, pod); err != nil {
4✔
1350
                return fmt.Errorf("error creating interruption pod: %w", err)
×
1351
        }
×
1352

1353
        _ = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageInterrupt, 0, _package.ContainerSHA)
4✔
1354

4✔
1355
        r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonSkyhookInterrupt,
4✔
1356
                "Interrupting node [%s] package [%s:%s] from [skyhook:%s]",
4✔
1357
                skyhookNode.GetNode().Name,
4✔
1358
                _package.Name,
4✔
1359
                _package.Version,
4✔
1360
                skyhookNode.GetSkyhook().Name)
4✔
1361

4✔
1362
        return nil
4✔
1363
}
1364

1365
// fudgeInterruptWithPriority takes a list of packages, interrupts, and configUpdates and returns the correct merged interrupt to run to handle all the packages
1366
func fudgeInterruptWithPriority(next []*v1alpha1.Package, configUpdates map[string][]string, interrupts map[string][]*v1alpha1.Interrupt) (*v1alpha1.Interrupt, string) {
7✔
1367
        var ret *v1alpha1.Interrupt
7✔
1368
        var pack string
7✔
1369

7✔
1370
        // map interrupt to priority
7✔
1371
        // A lower priority value means a higher priority and will be used in favor of anything with a higher value
7✔
1372
        var priorities = map[v1alpha1.InterruptType]int{
7✔
1373
                v1alpha1.REBOOT:               0,
7✔
1374
                v1alpha1.RESTART_ALL_SERVICES: 1,
7✔
1375
                v1alpha1.SERVICE:              2,
7✔
1376
                v1alpha1.NOOP:                 3,
7✔
1377
        }
7✔
1378

7✔
1379
        for _, _package := range next {
14✔
1380

7✔
1381
                if len(configUpdates[_package.Name]) == 0 {
14✔
1382
                        interrupts[_package.Name] = []*v1alpha1.Interrupt{}
7✔
1383
                        if _package.HasInterrupt() {
12✔
1384
                                interrupts[_package.Name] = append(interrupts[_package.Name], _package.Interrupt)
5✔
1385
                        }
5✔
1386
                }
1387
        }
1388

1389
        packageNames := make([]string, 0, len(next))
7✔
1390
        for _, pkg := range next {
14✔
1391
                packageNames = append(packageNames, pkg.Name)
7✔
1392
        }
7✔
1393
        sort.Strings(packageNames)
7✔
1394

7✔
1395
        for _, _package := range packageNames {
14✔
1396
                _interrupts, ok := interrupts[_package]
7✔
1397
                if !ok {
12✔
1398
                        continue
5✔
1399
                }
1400

1401
                for _, interrupt := range _interrupts {
12✔
1402
                        if ret == nil { // prime ret, base case
10✔
1403
                                ret = interrupt
5✔
1404
                                pack = _package
5✔
1405
                        }
5✔
1406

1407
                        // short circuit, reboot has highest priority
1408
                        switch interrupt.Type {
5✔
1409
                        case v1alpha1.REBOOT:
5✔
1410
                                return interrupt, _package
5✔
1411
                        }
1412

1413
                        // check if interrupt is higher priority using the priority_order
1414
                        // A lower priority value means a higher priority
1415
                        if priorities[interrupt.Type] < priorities[ret.Type] {
6✔
1416
                                ret = interrupt
1✔
1417
                                pack = _package
1✔
1418
                        } else if priorities[interrupt.Type] == priorities[ret.Type] {
11✔
1419
                                mergeInterrupt(ret, interrupt)
5✔
1420
                        }
5✔
1421
                }
1422
        }
1423

1424
        return ret, pack // return merged interrupt and package
7✔
1425
}
1426

1427
func mergeInterrupt(left, right *v1alpha1.Interrupt) {
5✔
1428

5✔
1429
        // make sure both are of type service
5✔
1430
        if left.Type != v1alpha1.SERVICE || right.Type != v1alpha1.SERVICE {
6✔
1431
                return
1✔
1432
        }
1✔
1433

1434
        left.Services = merge(left.Services, right.Services)
5✔
1435
}
1436

1437
func merge[T cmp.Ordered](left, right []T) []T {
5✔
1438
        for _, r := range right {
10✔
1439
                if !slices.Contains(left, r) {
10✔
1440
                        left = append(left, r)
5✔
1441
                }
5✔
1442
        }
1443
        slices.Sort(left)
5✔
1444
        return left
5✔
1445
}
1446

1447
// ValidateNodeConfigmaps validates that there are no orphaned or stale config maps for a node
1448
func (r *SkyhookReconciler) ValidateNodeConfigmaps(ctx context.Context, skyhookName string, nodes []wrapper.SkyhookNode) (bool, error) {
6✔
1449
        var list corev1.ConfigMapList
6✔
1450
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhookName})
6✔
1451
        if err != nil {
6✔
1452
                return false, fmt.Errorf("error listing config maps: %w", err)
×
1453
        }
×
1454

1455
        // No configmaps created by this skyhook, no work needs to be done
1456
        if len(list.Items) == 0 {
12✔
1457
                return false, nil
6✔
1458
        }
6✔
1459

1460
        existingCMs := make(map[string]corev1.ConfigMap)
6✔
1461
        for _, cm := range list.Items {
12✔
1462
                existingCMs[cm.Name] = cm
6✔
1463
        }
6✔
1464

1465
        shouldExist := make(map[string]struct{})
6✔
1466
        for _, node := range nodes {
12✔
1467
                shouldExist[generateSafeName(253, skyhookName, node.GetNode().Name, "metadata")] = struct{}{}
6✔
1468
        }
6✔
1469

1470
        update := false
6✔
1471
        errs := make([]error, 0)
6✔
1472
        for k, v := range existingCMs {
12✔
1473
                if _, ok := shouldExist[k]; !ok {
6✔
1474
                        update = true
×
1475
                        err := r.Delete(ctx, &v)
×
1476
                        if err != nil {
×
1477
                                errs = append(errs, fmt.Errorf("error deleting existing config map [%s]: %w", v.Name, err))
×
1478
                        }
×
1479
                }
1480
        }
1481

1482
        // Ensure packages.json is present and up-to-date for expected configmaps
1483
        skyhookCR, err := r.dal.GetSkyhook(ctx, skyhookName)
6✔
1484
        if err != nil {
6✔
1485
                return update, fmt.Errorf("error getting skyhook for metadata validation: %w", err)
×
1486
        }
×
1487
        skyhookWrapper := wrapper.NewSkyhookWrapper(skyhookCR)
6✔
1488
        metadata := NewSkyhookMetadata(r.opts, skyhookWrapper)
6✔
1489
        expectedBytes, err := metadata.Marshal()
6✔
1490
        if err != nil {
6✔
1491
                return update, fmt.Errorf("error marshalling metadata for validation: %w", err)
×
1492
        }
×
1493
        expected := string(expectedBytes)
6✔
1494

6✔
1495
        for i := range list.Items {
12✔
1496
                cm := &list.Items[i]
6✔
1497
                if _, ok := shouldExist[cm.Name]; !ok {
6✔
1498
                        continue
×
1499
                }
1500
                if cm.Data == nil {
6✔
1501
                        cm.Data = make(map[string]string)
×
1502
                }
×
1503
                if cm.Data["packages.json"] != expected {
10✔
1504
                        cm.Data["packages.json"] = expected
4✔
1505
                        if err := r.Update(ctx, cm); err != nil {
4✔
1506
                                errs = append(errs, fmt.Errorf("error updating packages.json on config map [%s]: %w", cm.Name, err))
×
1507
                        } else {
4✔
1508
                                update = true
4✔
1509
                        }
4✔
1510
                }
1511
        }
1512

1513
        return update, utilerrors.NewAggregate(errs)
6✔
1514
}
1515

1516
// PodExists tests if this package is exists on a node.
1517
func (r *SkyhookReconciler) PodExists(ctx context.Context, nodeName, skyhookName string, _package *v1alpha1.Package) (bool, error) {
6✔
1518

6✔
1519
        pods, err := r.dal.GetPods(ctx,
6✔
1520
                client.MatchingFields{
6✔
1521
                        "spec.nodeName": nodeName,
6✔
1522
                },
6✔
1523
                client.MatchingLabels{
6✔
1524
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhookName,
6✔
1525
                        fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
6✔
1526
                },
6✔
1527
        )
6✔
1528
        if err != nil {
6✔
1529
                return false, fmt.Errorf("error check from existing pods: %w", err)
×
1530
        }
×
1531

1532
        if pods == nil || len(pods.Items) == 0 {
12✔
1533
                return false, nil
6✔
1534
        }
6✔
1535
        return true, nil
6✔
1536
}
1537

1538
// createInterruptPodForPackage returns the pod spec for an interrupt pod given an package
1539
func createInterruptPodForPackage(opts SkyhookOperatorOptions, _interrupt *v1alpha1.Interrupt, argEncode string, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string) *corev1.Pod {
5✔
1540
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
5✔
1541
                opts.CopyDirRoot,
5✔
1542
                skyhook.Name,
5✔
1543
                _package.Name,
5✔
1544
                _package.Version,
5✔
1545
                skyhook.UID,
5✔
1546
                skyhook.Generation,
5✔
1547
        )
5✔
1548

5✔
1549
        volumes := []corev1.Volume{
5✔
1550
                {
5✔
1551
                        Name: "root-mount",
5✔
1552
                        VolumeSource: corev1.VolumeSource{
5✔
1553
                                HostPath: &corev1.HostPathVolumeSource{
5✔
1554
                                        Path: "/",
5✔
1555
                                },
5✔
1556
                        },
5✔
1557
                },
5✔
1558
                {
5✔
1559
                        // node names in different CSPs might include dots which isn't allowed in volume names
5✔
1560
                        // so we have to replace all dots with dashes
5✔
1561
                        Name: generateSafeName(63, skyhook.Name, nodeName, "metadata"),
5✔
1562
                        VolumeSource: corev1.VolumeSource{
5✔
1563
                                ConfigMap: &corev1.ConfigMapVolumeSource{
5✔
1564
                                        LocalObjectReference: corev1.LocalObjectReference{
5✔
1565
                                                Name: strings.ReplaceAll(fmt.Sprintf("%s-%s-metadata", skyhook.Name, nodeName), ".", "-"),
5✔
1566
                                        },
5✔
1567
                                },
5✔
1568
                        },
5✔
1569
                },
5✔
1570
        }
5✔
1571
        volumeMounts := []corev1.VolumeMount{
5✔
1572
                {
5✔
1573
                        Name:             "root-mount",
5✔
1574
                        MountPath:        "/root",
5✔
1575
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
5✔
1576
                },
5✔
1577
        }
5✔
1578

5✔
1579
        pod := &corev1.Pod{
5✔
1580
                ObjectMeta: metav1.ObjectMeta{
5✔
1581
                        Name:      generateSafeName(63, skyhook.Name, "interrupt", string(_interrupt.Type), nodeName),
5✔
1582
                        Namespace: opts.Namespace,
5✔
1583
                        Labels: map[string]string{
5✔
1584
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
5✔
1585
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX):   fmt.Sprintf("%s-%s", _package.Name, _package.Version),
5✔
1586
                                fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX): "True",
5✔
1587
                        },
5✔
1588
                },
5✔
1589
                Spec: corev1.PodSpec{
5✔
1590
                        NodeName:      nodeName,
5✔
1591
                        RestartPolicy: corev1.RestartPolicyOnFailure,
5✔
1592
                        InitContainers: []corev1.Container{
5✔
1593
                                {
5✔
1594
                                        Name:  InterruptContainerName,
5✔
1595
                                        Image: getAgentImage(opts, _package),
5✔
1596
                                        Args:  []string{"interrupt", "/root", copyDir, argEncode},
5✔
1597
                                        Env:   getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name),
5✔
1598
                                        SecurityContext: &corev1.SecurityContext{
5✔
1599
                                                Privileged: ptr(true),
5✔
1600
                                        },
5✔
1601
                                        VolumeMounts: volumeMounts,
5✔
1602
                                        Resources: corev1.ResourceRequirements{
5✔
1603
                                                Limits: corev1.ResourceList{
5✔
1604
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
5✔
1605
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
5✔
1606
                                                },
5✔
1607
                                                Requests: corev1.ResourceList{
5✔
1608
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
5✔
1609
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
5✔
1610
                                                },
5✔
1611
                                        },
5✔
1612
                                },
5✔
1613
                        },
5✔
1614
                        Containers: []corev1.Container{
5✔
1615
                                {
5✔
1616
                                        Name:  "pause",
5✔
1617
                                        Image: opts.PauseImage,
5✔
1618
                                        Resources: corev1.ResourceRequirements{
5✔
1619
                                                Limits: corev1.ResourceList{
5✔
1620
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
5✔
1621
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
5✔
1622
                                                },
5✔
1623
                                                Requests: corev1.ResourceList{
5✔
1624
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
5✔
1625
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
5✔
1626
                                                },
5✔
1627
                                        },
5✔
1628
                                },
5✔
1629
                        },
5✔
1630
                        HostPID:     true,
5✔
1631
                        HostNetwork: true,
5✔
1632
                        // If you change these go change the SelectNode toleration in cluster_state.go
5✔
1633
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
5✔
1634
                                {
5✔
1635
                                        Key:      TaintUnschedulable,
5✔
1636
                                        Operator: corev1.TolerationOpExists,
5✔
1637
                                },
5✔
1638
                                opts.GetRuntimeRequiredToleration(),
5✔
1639
                        }, skyhook.Spec.AdditionalTolerations...),
5✔
1640
                        Volumes: volumes,
5✔
1641
                },
5✔
1642
        }
5✔
1643
        if opts.ImagePullSecret != "" {
5✔
1644
                pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
×
1645
                        {
×
1646
                                Name: opts.ImagePullSecret,
×
1647
                        },
×
1648
                }
×
1649
        }
×
1650
        return pod
5✔
1651
}
1652

1653
func trunstr(str string, length int) string {
7✔
1654
        if len(str) > length {
7✔
1655
                return str[:length]
×
1656
        }
×
1657
        return str
7✔
1658
}
1659

1660
func getAgentImage(opts SkyhookOperatorOptions, _package *v1alpha1.Package) string {
7✔
1661
        if _package.AgentImageOverride != "" {
11✔
1662
                return _package.AgentImageOverride
4✔
1663
        }
4✔
1664
        return opts.AgentImage
7✔
1665
}
1666

1667
// getPackageImage returns the full image reference for a package, using the digest if specified
1668
func getPackageImage(_package *v1alpha1.Package) string {
7✔
1669
        if _package.ContainerSHA != "" {
11✔
1670
                // When containerSHA is specified, use it instead of the version tag for immutable image reference
4✔
1671
                return fmt.Sprintf("%s@%s", _package.Image, _package.ContainerSHA)
4✔
1672
        }
4✔
1673
        // Fall back to version tag
1674
        return fmt.Sprintf("%s:%s", _package.Image, _package.Version)
7✔
1675
}
1676

1677
func getAgentConfigEnvVars(opts SkyhookOperatorOptions, packageName string, packageVersion string, resourceID string, skyhookName string) []corev1.EnvVar {
7✔
1678
        return []corev1.EnvVar{
7✔
1679
                {
7✔
1680
                        Name:  "SKYHOOK_LOG_DIR",
7✔
1681
                        Value: fmt.Sprintf("%s/%s", opts.AgentLogRoot, skyhookName),
7✔
1682
                },
7✔
1683
                {
7✔
1684
                        Name:  "SKYHOOK_ROOT_DIR",
7✔
1685
                        Value: fmt.Sprintf("%s/%s", opts.CopyDirRoot, skyhookName),
7✔
1686
                },
7✔
1687
                {
7✔
1688
                        Name:  "COPY_RESOLV",
7✔
1689
                        Value: "false",
7✔
1690
                },
7✔
1691
                {
7✔
1692
                        Name:  "SKYHOOK_RESOURCE_ID",
7✔
1693
                        Value: fmt.Sprintf("%s_%s_%s", resourceID, packageName, packageVersion),
7✔
1694
                },
7✔
1695
        }
7✔
1696
}
7✔
1697

1698
// createPodFromPackage creates a pod spec for a skyhook pod for a given package
1699
func createPodFromPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string, stage v1alpha1.Stage) *corev1.Pod {
7✔
1700
        // Generate consistent names that won't exceed k8s limits
7✔
1701
        volumeName := generateSafeName(63, "metadata", nodeName)
7✔
1702
        configMapName := generateSafeName(253, skyhook.Name, nodeName, "metadata")
7✔
1703

7✔
1704
        volumes := []corev1.Volume{
7✔
1705
                {
7✔
1706
                        Name: "root-mount",
7✔
1707
                        VolumeSource: corev1.VolumeSource{
7✔
1708
                                HostPath: &corev1.HostPathVolumeSource{
7✔
1709
                                        Path: "/",
7✔
1710
                                },
7✔
1711
                        },
7✔
1712
                },
7✔
1713
                {
7✔
1714
                        Name: volumeName,
7✔
1715
                        VolumeSource: corev1.VolumeSource{
7✔
1716
                                ConfigMap: &corev1.ConfigMapVolumeSource{
7✔
1717
                                        LocalObjectReference: corev1.LocalObjectReference{
7✔
1718
                                                Name: configMapName,
7✔
1719
                                        },
7✔
1720
                                },
7✔
1721
                        },
7✔
1722
                },
7✔
1723
        }
7✔
1724

7✔
1725
        volumeMounts := []corev1.VolumeMount{
7✔
1726
                {
7✔
1727
                        Name:             "root-mount",
7✔
1728
                        MountPath:        "/root",
7✔
1729
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
7✔
1730
                },
7✔
1731
                {
7✔
1732
                        Name:      volumeName,
7✔
1733
                        MountPath: "/skyhook-package/node-metadata",
7✔
1734
                },
7✔
1735
        }
7✔
1736

7✔
1737
        if len(_package.ConfigMap) > 0 {
13✔
1738
                volumeMounts = append(volumeMounts, corev1.VolumeMount{
6✔
1739
                        Name:      _package.Name,
6✔
1740
                        MountPath: "/skyhook-package/configmaps",
6✔
1741
                })
6✔
1742

6✔
1743
                volumes = append(volumes, corev1.Volume{
6✔
1744
                        Name: _package.Name,
6✔
1745
                        VolumeSource: corev1.VolumeSource{
6✔
1746
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
1747
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
1748
                                                Name: strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.Name, _package.Name, _package.Version)),
6✔
1749
                                        },
6✔
1750
                                },
6✔
1751
                        },
6✔
1752
                })
6✔
1753
        }
6✔
1754

1755
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
7✔
1756
                opts.CopyDirRoot,
7✔
1757
                skyhook.Name,
7✔
1758
                _package.Name,
7✔
1759
                _package.Version,
7✔
1760
                skyhook.UID,
7✔
1761
                skyhook.Generation,
7✔
1762
        )
7✔
1763
        applyargs := []string{strings.ToLower(string(stage)), "/root", copyDir}
7✔
1764
        checkargs := []string{strings.ToLower(string(stage) + "-check"), "/root", copyDir}
7✔
1765

7✔
1766
        agentEnvs := append(
7✔
1767
                _package.Env,
7✔
1768
                getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name)...,
7✔
1769
        )
7✔
1770

7✔
1771
        pod := &corev1.Pod{
7✔
1772
                ObjectMeta: metav1.ObjectMeta{
7✔
1773
                        Name:      generateSafeName(63, skyhook.Name, _package.Name, _package.Version, string(stage), nodeName),
7✔
1774
                        Namespace: opts.Namespace,
7✔
1775
                        Labels: map[string]string{
7✔
1776
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.Name,
7✔
1777
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
7✔
1778
                        },
7✔
1779
                },
7✔
1780
                Spec: corev1.PodSpec{
7✔
1781
                        NodeName:      nodeName,
7✔
1782
                        RestartPolicy: corev1.RestartPolicyOnFailure,
7✔
1783
                        InitContainers: []corev1.Container{
7✔
1784
                                {
7✔
1785
                                        Name:            fmt.Sprintf("%s-init", trunstr(_package.Name, 43)),
7✔
1786
                                        Image:           getPackageImage(_package),
7✔
1787
                                        ImagePullPolicy: "Always",
7✔
1788
                                        Command:         []string{"/bin/sh"},
7✔
1789
                                        Args: []string{
7✔
1790
                                                "-c",
7✔
1791
                                                "mkdir -p /root/${SKYHOOK_DIR} && cp -r /skyhook-package/* /root/${SKYHOOK_DIR}",
7✔
1792
                                        },
7✔
1793
                                        Env: []corev1.EnvVar{
7✔
1794
                                                {
7✔
1795
                                                        Name:  "SKYHOOK_DIR",
7✔
1796
                                                        Value: copyDir,
7✔
1797
                                                },
7✔
1798
                                        },
7✔
1799
                                        SecurityContext: &corev1.SecurityContext{
7✔
1800
                                                Privileged: ptr(true),
7✔
1801
                                        },
7✔
1802
                                        VolumeMounts: volumeMounts,
7✔
1803
                                },
7✔
1804
                                {
7✔
1805
                                        Name:            fmt.Sprintf("%s-%s", trunstr(_package.Name, 43), stage),
7✔
1806
                                        Image:           getAgentImage(opts, _package),
7✔
1807
                                        ImagePullPolicy: "Always",
7✔
1808
                                        Args:            applyargs,
7✔
1809
                                        Env:             agentEnvs,
7✔
1810
                                        SecurityContext: &corev1.SecurityContext{
7✔
1811
                                                Privileged: ptr(true),
7✔
1812
                                        },
7✔
1813
                                        VolumeMounts: volumeMounts,
7✔
1814
                                },
7✔
1815
                                {
7✔
1816
                                        Name:            fmt.Sprintf("%s-%scheck", trunstr(_package.Name, 43), stage),
7✔
1817
                                        Image:           getAgentImage(opts, _package),
7✔
1818
                                        ImagePullPolicy: "Always",
7✔
1819
                                        Args:            checkargs,
7✔
1820
                                        Env:             agentEnvs,
7✔
1821
                                        SecurityContext: &corev1.SecurityContext{
7✔
1822
                                                Privileged: ptr(true),
7✔
1823
                                        },
7✔
1824
                                        VolumeMounts: volumeMounts,
7✔
1825
                                },
7✔
1826
                        },
7✔
1827
                        Containers: []corev1.Container{
7✔
1828
                                {
7✔
1829
                                        Name:  "pause",
7✔
1830
                                        Image: opts.PauseImage,
7✔
1831
                                        Resources: corev1.ResourceRequirements{
7✔
1832
                                                Limits: corev1.ResourceList{
7✔
1833
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
7✔
1834
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
7✔
1835
                                                },
7✔
1836
                                                Requests: corev1.ResourceList{
7✔
1837
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
7✔
1838
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
7✔
1839
                                                },
7✔
1840
                                        },
7✔
1841
                                },
7✔
1842
                        },
7✔
1843
                        Volumes:     volumes,
7✔
1844
                        HostPID:     true,
7✔
1845
                        HostNetwork: true,
7✔
1846
                        // If you change these go change the SelectNode toleration in cluster_state.go
7✔
1847
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
7✔
1848
                                {
7✔
1849
                                        Key:      TaintUnschedulable,
7✔
1850
                                        Operator: corev1.TolerationOpExists,
7✔
1851
                                },
7✔
1852
                                opts.GetRuntimeRequiredToleration(),
7✔
1853
                        }, skyhook.Spec.AdditionalTolerations...),
7✔
1854
                },
7✔
1855
        }
7✔
1856
        if opts.ImagePullSecret != "" {
7✔
1857
                pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
×
1858
                        {
×
1859
                                Name: opts.ImagePullSecret,
×
1860
                        },
×
1861
                }
×
1862
        }
×
1863
        if _package.GracefulShutdown != nil {
11✔
1864
                pod.Spec.TerminationGracePeriodSeconds = ptr(int64(_package.GracefulShutdown.Duration.Seconds()))
4✔
1865
        }
4✔
1866
        setPodResources(pod, _package.Resources)
7✔
1867
        return pod
7✔
1868
}
1869

1870
// FilterEnv removes the environment variables passed into exlude
1871
func FilterEnv(envs []corev1.EnvVar, exclude ...string) []corev1.EnvVar {
7✔
1872
        var filteredEnv []corev1.EnvVar
7✔
1873

7✔
1874
        // build map of exclude strings for faster lookup
7✔
1875
        excludeMap := make(map[string]struct{})
7✔
1876
        for _, name := range exclude {
14✔
1877
                excludeMap[name] = struct{}{}
7✔
1878
        }
7✔
1879

1880
        // If the environment variable name is in the exclude list, skip it
1881
        // otherwise append it to the final list
1882
        for _, env := range envs {
14✔
1883
                if _, found := excludeMap[env.Name]; !found {
14✔
1884
                        filteredEnv = append(filteredEnv, env)
7✔
1885
                }
7✔
1886
        }
1887

1888
        return filteredEnv
7✔
1889
}
1890

1891
// PodMatchesPackage asserts that a given pod matches the given pod spec
1892
func podMatchesPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, pod corev1.Pod, skyhook *wrapper.Skyhook, stage v1alpha1.Stage) bool {
7✔
1893
        var expectedPod *corev1.Pod
7✔
1894

7✔
1895
        // need to differentiate whether the pod is for an interrupt or not so we know
7✔
1896
        // what to expect and how to compare them
7✔
1897
        isInterrupt := false
7✔
1898
        _, limitRange := pod.Annotations["kubernetes.io/limit-ranger"]
7✔
1899

7✔
1900
        if pod.Labels[fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX)] == "True" {
12✔
1901
                expectedPod = createInterruptPodForPackage(opts, &v1alpha1.Interrupt{}, "", _package, skyhook, "")
5✔
1902
                isInterrupt = true
5✔
1903
        } else {
12✔
1904
                expectedPod = createPodFromPackage(opts, _package, skyhook, "", stage)
7✔
1905
        }
7✔
1906

1907
        actualPod := pod.DeepCopy()
7✔
1908

7✔
1909
        // check to see whether the name or the version of the package changed
7✔
1910
        packageLabel := fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX)
7✔
1911
        if actualPod.Labels[packageLabel] != expectedPod.Labels[packageLabel] {
12✔
1912
                return false
5✔
1913
        }
5✔
1914

1915
        // compare initContainers since this is where a lot of the important info lives
1916
        for i := range actualPod.Spec.InitContainers {
14✔
1917
                expectedContainer := expectedPod.Spec.InitContainers[i]
7✔
1918
                actualContainer := actualPod.Spec.InitContainers[i]
7✔
1919

7✔
1920
                if expectedContainer.Name != actualContainer.Name {
8✔
1921
                        return false
1✔
1922
                }
1✔
1923

1924
                if expectedContainer.Image != actualContainer.Image {
7✔
1925
                        return false
×
1926
                }
×
1927

1928
                // compare the containers env vars except for the ones that are inserted
1929
                // by the operator by default as the SKYHOOK_RESOURCE_ID will change every
1930
                // time the skyhook is updated and would cause every pod to be removed
1931
                // TODO: This is ignoring all the static env vars that are set by operator config.
1932
                // It probably should be just SKYHOOK_RESOURCE_ID that is ignored. Otherwise,
1933
                // a user will have to manually delete the pod to update the package when operator is updated.
1934
                dummyAgentEnv := getAgentConfigEnvVars(opts, "", "", "", "")
7✔
1935
                excludedEnvs := make([]string, len(dummyAgentEnv))
7✔
1936
                for i, env := range dummyAgentEnv {
14✔
1937
                        excludedEnvs[i] = env.Name
7✔
1938
                }
7✔
1939
                expectedFilteredEnv := FilterEnv(expectedContainer.Env, excludedEnvs...)
7✔
1940
                actualFilteredEnv := FilterEnv(actualContainer.Env, excludedEnvs...)
7✔
1941
                if !reflect.DeepEqual(expectedFilteredEnv, actualFilteredEnv) {
12✔
1942
                        return false
5✔
1943
                }
5✔
1944

1945
                if !isInterrupt { // dont compare these since they are not configured on interrupt
14✔
1946
                        // compare resource requests and limits (CPU, memory, etc.)
7✔
1947
                        expectedResources := expectedContainer.Resources
7✔
1948
                        actualResources := actualContainer.Resources
7✔
1949
                        if skyhook.Spec.Packages[_package.Name].Resources != nil {
12✔
1950
                                // If CR has resources specified, they should match exactly
5✔
1951
                                if !reflect.DeepEqual(expectedResources, actualResources) {
6✔
1952
                                        return false
1✔
1953
                                }
1✔
1954
                        } else {
7✔
1955
                                // If CR has no resources specified, ensure pod has no resource overrides
7✔
1956
                                if !limitRange {
14✔
1957
                                        if actualResources.Requests != nil || actualResources.Limits != nil {
8✔
1958
                                                return false
1✔
1959
                                        }
1✔
1960
                                }
1961
                        }
1962
                }
1963
        }
1964

1965
        return true
7✔
1966
}
1967

1968
// ValidateRunningPackages deletes pods that don't match the current spec and checks if there are pods running
1969
// that don't match the node state and removes them if they exist
1970
func (r *SkyhookReconciler) ValidateRunningPackages(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
6✔
1971

6✔
1972
        update := false
6✔
1973
        errs := make([]error, 0)
6✔
1974
        // get all pods for this skyhook packages
6✔
1975
        pods, err := r.dal.GetPods(ctx,
6✔
1976
                client.MatchingLabels{
6✔
1977
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
6✔
1978
                },
6✔
1979
        )
6✔
1980
        if err != nil {
6✔
1981
                return false, fmt.Errorf("error getting pods while validating packages: %w", err)
×
1982
        }
×
1983
        if pods == nil || len(pods.Items) == 0 {
12✔
1984
                return false, nil // nothing running for this skyhook on this node
6✔
1985
        }
6✔
1986

1987
        // Initialize metrics for each stage
1988
        stages := make(map[string]map[string]map[v1alpha1.Stage]int)
6✔
1989

6✔
1990
        // group pods by node
6✔
1991
        podsbyNode := make(map[string][]corev1.Pod)
6✔
1992
        for _, pod := range pods.Items {
12✔
1993
                podsbyNode[pod.Spec.NodeName] = append(podsbyNode[pod.Spec.NodeName], pod)
6✔
1994
        }
6✔
1995

1996
        for _, node := range skyhook.GetNodes() {
12✔
1997
                nodeState, err := node.State()
6✔
1998
                if err != nil {
6✔
1999
                        return false, fmt.Errorf("error getting node state: %w", err)
×
2000
                }
×
2001

2002
                for _, pod := range podsbyNode[node.GetNode().Name] {
12✔
2003
                        found := false
6✔
2004

6✔
2005
                        runningPackage, err := GetPackage(&pod)
6✔
2006
                        if err != nil {
6✔
2007
                                errs = append(errs, fmt.Errorf("error getting package from pod [%s:%s] while validating packages: %w", pod.Namespace, pod.Name, err))
×
2008
                        }
×
2009

2010
                        // check if the package is part of the skyhook spec, if not we need to delete it
2011
                        for _, v := range skyhook.GetSkyhook().Spec.Packages {
12✔
2012
                                if podMatchesPackage(r.opts, &v, pod, skyhook.GetSkyhook(), runningPackage.Stage) {
12✔
2013
                                        found = true
6✔
2014
                                }
6✔
2015
                        }
2016

2017
                        // Increment the stage count for metrics
2018
                        if _, ok := stages[runningPackage.Name]; !ok {
12✔
2019
                                stages[runningPackage.Name] = make(map[string]map[v1alpha1.Stage]int)
6✔
2020
                                if _, ok := stages[runningPackage.Name][runningPackage.Version]; !ok {
12✔
2021
                                        stages[runningPackage.Name][runningPackage.Version] = make(map[v1alpha1.Stage]int)
6✔
2022
                                        for _, stage := range v1alpha1.Stages {
12✔
2023
                                                stages[runningPackage.Name][runningPackage.Version][stage] = 0
6✔
2024
                                        }
6✔
2025
                                }
2026
                        }
2027
                        stages[runningPackage.Name][runningPackage.Version][runningPackage.Stage]++
6✔
2028

6✔
2029
                        // uninstall is by definition not part of the skyhook spec, so we cant delete it (because it used to be but was removed, hence uninstalling it)
6✔
2030
                        if runningPackage.Stage == v1alpha1.StageUninstall {
10✔
2031
                                found = true
4✔
2032
                        }
4✔
2033

2034
                        if !found {
10✔
2035
                                update = true
4✔
2036

4✔
2037
                                err := r.InvalidPackage(ctx, &pod)
4✔
2038
                                if err != nil {
8✔
2039
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
4✔
2040
                                }
4✔
2041
                                continue
4✔
2042
                        }
2043

2044
                        // Check if package exists in node state, ie a package running that the node state doesn't know about
2045
                        // something that is often done to try to fix bad node state is to clear the node state completely
2046
                        // which if a package is running, we want to terminate it gracefully. Ofthen what leads to this is
2047
                        // the package is in a crashloop and the operator want to restart it the whole package.
2048
                        // when we apply a package it just check if there is a running package on the node for the state of the package
2049
                        // this can cause to leave a pod running in say config mode, and it there is a depends on you might not correctly
2050
                        // run thins in the correct order.
2051
                        deleteMe := false
6✔
2052
                        packageStatus, exists := nodeState[runningPackage.GetUniqueName()]
6✔
2053
                        if !exists { // package not in node state, so we need to delete it
11✔
2054
                                deleteMe = true
5✔
2055
                        } else { // package in node state, so we need to check if it's running
11✔
2056
                                // need check if the stats match, if not we need to delete it
6✔
2057
                                if packageStatus.Stage != runningPackage.Stage {
11✔
2058
                                        deleteMe = true
5✔
2059
                                }
5✔
2060
                        }
2061

2062
                        if deleteMe {
11✔
2063
                                update = true
5✔
2064
                                err := r.InvalidPackage(ctx, &pod)
5✔
2065
                                if err != nil {
9✔
2066
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
4✔
2067
                                }
4✔
2068
                        }
2069
                }
2070
        }
2071

2072
        return update, utilerrors.NewAggregate(errs)
6✔
2073
}
2074

2075
// InvalidPackage invalidates a package and updates the pod, which will trigger the pod to be deleted
2076
func (r *SkyhookReconciler) InvalidPackage(ctx context.Context, pod *corev1.Pod) error {
5✔
2077
        err := InvalidatePackage(pod)
5✔
2078
        if err != nil {
5✔
2079
                return fmt.Errorf("error invalidating package: %w", err)
×
2080
        }
×
2081

2082
        err = r.Update(ctx, pod)
5✔
2083
        if err != nil {
9✔
2084
                return fmt.Errorf("error updating pod: %w", err)
4✔
2085
        }
4✔
2086

2087
        return nil
5✔
2088
}
2089

2090
// ProcessInterrupt will check and do the interrupt if need, and returns
2091
// false means we are waiting
2092
// true means we are good to proceed
2093
func (r *SkyhookReconciler) ProcessInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, interrupt *v1alpha1.Interrupt, runInterrupt bool) (bool, error) {
6✔
2094

6✔
2095
        if !skyhookNode.HasInterrupt(*_package) {
12✔
2096
                return true, nil
6✔
2097
        }
6✔
2098

2099
        // default starting stage
2100
        stage := v1alpha1.StageApply
4✔
2101
        nextStage := skyhookNode.NextStage(_package)
4✔
2102
        if nextStage != nil {
8✔
2103
                stage = *nextStage
4✔
2104
        }
4✔
2105

2106
        // wait tell this is done if its happening
2107
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
4✔
2108
        if found && status.State == v1alpha1.StateSkipped {
8✔
2109
                return false, nil
4✔
2110
        }
4✔
2111

2112
        // Theres is a race condition when a node reboots and api cleans up the interrupt pod
2113
        // so we need to check if the pod exists and if it does, we need to recreate it
2114
        if status != nil && (status.State == v1alpha1.StateInProgress || status.State == v1alpha1.StateErroring) && status.Stage == v1alpha1.StageInterrupt {
8✔
2115
                // call interrupt to recreate the pod if missing
4✔
2116
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
4✔
2117
                if err != nil {
4✔
2118
                        return false, err
×
2119
                }
×
2120
        }
2121

2122
        // drain and cordon node before applying package that has an interrupt
2123
        if stage == v1alpha1.StageApply {
8✔
2124
                ready, err := r.EnsureNodeIsReadyForInterrupt(ctx, skyhookNode, _package)
4✔
2125
                if err != nil {
4✔
2126
                        return false, err
×
2127
                }
×
2128

2129
                if !ready {
8✔
2130
                        return false, nil
4✔
2131
                }
4✔
2132
        }
2133

2134
        // time to interrupt (once other packages have finished)
2135
        if stage == v1alpha1.StageInterrupt && runInterrupt {
8✔
2136
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
4✔
2137
                if err != nil {
4✔
2138
                        return false, err
×
2139
                }
×
2140

2141
                return false, nil
4✔
2142
        }
2143

2144
        //skipping
2145
        if stage == v1alpha1.StageInterrupt && !runInterrupt {
8✔
2146
                err := skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, stage, 0, _package.ContainerSHA)
4✔
2147
                if err != nil {
4✔
2148
                        return false, fmt.Errorf("error upserting to skip interrupt: %w", err)
×
2149
                }
×
2150
                return false, nil
4✔
2151
        }
2152

2153
        // wait tell this is done if its happening
2154
        if status != nil && status.Stage == v1alpha1.StageInterrupt && status.State != v1alpha1.StateComplete {
8✔
2155
                return false, nil
4✔
2156
        }
4✔
2157

2158
        return true, nil
4✔
2159
}
2160

2161
func (r *SkyhookReconciler) EnsureNodeIsReadyForInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
4✔
2162
        // cordon node
4✔
2163
        skyhookNode.Cordon()
4✔
2164

4✔
2165
        hasWork, err := r.HasNonInterruptWork(ctx, skyhookNode)
4✔
2166
        if err != nil {
4✔
2167
                return false, err
×
2168
        }
×
2169
        if hasWork { // keep waiting...
8✔
2170
                return false, nil
4✔
2171
        }
4✔
2172

2173
        ready, err := r.DrainNode(ctx, skyhookNode, _package)
4✔
2174
        if err != nil {
4✔
2175
                return false, fmt.Errorf("error draining node [%s]: %w", skyhookNode.GetNode().Name, err)
×
2176
        }
×
2177

2178
        return ready, nil
4✔
2179
}
2180

2181
// ApplyPackage starts a pod on node for the package
2182
func (r *SkyhookReconciler) ApplyPackage(ctx context.Context, logger logr.Logger, clusterState *clusterState, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, runInterrupt bool) error {
6✔
2183

6✔
2184
        if _package == nil {
6✔
2185
                return errors.New("can not apply nil package")
×
2186
        }
×
2187

2188
        // default starting stage
2189
        stage := v1alpha1.StageApply
6✔
2190

6✔
2191
        // These modes don't have anything that comes before them so we must specify them as the
6✔
2192
        // starting point. The next stage function will return nil until these modes complete.
6✔
2193
        // Config is a special case as sometimes apply will come before it and other times it wont
6✔
2194
        // which is why it needs to be here as well
6✔
2195
        if packageStatus, found := skyhookNode.PackageStatus(_package.GetUniqueName()); found {
12✔
2196
                switch packageStatus.Stage {
6✔
2197
                case v1alpha1.StageConfig, v1alpha1.StageUpgrade, v1alpha1.StageUninstall:
6✔
2198
                        stage = packageStatus.Stage
6✔
2199
                }
2200
        }
2201

2202
        // if stage != v1alpha1.StageApply {
2203
        //         // If a node gets rest by a user, the about method will return the wrong node state. Above sources it from the skyhook status.
2204
        //         // check if the node has nothing, reset it then apply the package.
2205
        //         nodeState, err := skyhookNode.State()
2206
        //         if err != nil {
2207
        //                 return fmt.Errorf("error getting node state: %w", err)
2208
        //         }
2209

2210
        //         _, found := nodeState[_package.GetUniqueName()]
2211
        //         if !found {
2212
        //                 stage = v1alpha1.StageApply
2213
        //         }
2214
        // }
2215

2216
        nextStage := skyhookNode.NextStage(_package)
6✔
2217
        if nextStage != nil {
12✔
2218
                stage = *nextStage
6✔
2219
        }
6✔
2220

2221
        // test if pod exists, if so, bailout
2222
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
6✔
2223
        if err != nil {
6✔
2224
                return err
×
2225
        }
×
2226

2227
        // wait tell this is done if its happening
2228
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
6✔
2229

6✔
2230
        if found && status.State == v1alpha1.StateSkipped { // skipped, so nothing to do
6✔
2231
                return nil
×
2232
        }
×
2233

2234
        if found && status.State == v1alpha1.StateInProgress { // running, so do nothing atm
12✔
2235
                if exists {
12✔
2236
                        return nil
6✔
2237
                }
6✔
2238
        }
2239

2240
        if exists {
12✔
2241
                // nothing to do here, already running
6✔
2242
                return nil
6✔
2243
        }
6✔
2244

2245
        // Ensure the node metadata configmap exists before creating the pod
2246
        // This prevents a race where the pod starts before its required configmap is created
2247
        if err := r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhookNode.GetSkyhook(), skyhookNode.GetNode()); err != nil {
6✔
2248
                return fmt.Errorf("error upserting node metadata configmap: %w", err)
×
2249
        }
×
2250

2251
        pod := createPodFromPackage(r.opts, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name, stage)
6✔
2252

6✔
2253
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, stage, _package); err != nil {
6✔
2254
                return fmt.Errorf("error setting package on pod: %w", err)
×
2255
        }
×
2256

2257
        // setup ownership of the pod we created
2258
        // helps run time know what to do when something happens to this pod we are about to create
2259
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
6✔
2260
                return fmt.Errorf("error setting ownership: %w", err)
×
2261
        }
×
2262

2263
        if err := r.Create(ctx, pod); err != nil {
6✔
2264
                return fmt.Errorf("error creating pod: %w", err)
×
2265
        }
×
2266

2267
        if err = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, stage, 0, _package.ContainerSHA); err != nil {
6✔
2268
                err = fmt.Errorf("error upserting package: %w", err) // want to keep going in this case, but don't want to lose the err
×
2269
        }
×
2270

2271
        skyhookNode.SetStatus(v1alpha1.StatusInProgress)
6✔
2272

6✔
2273
        skyhookNode.GetSkyhook().AddCondition(metav1.Condition{
6✔
2274
                Type:               fmt.Sprintf("%s/ApplyPackage", v1alpha1.METADATA_PREFIX),
6✔
2275
                Status:             metav1.ConditionTrue,
6✔
2276
                ObservedGeneration: skyhookNode.GetSkyhook().Generation,
6✔
2277
                LastTransitionTime: metav1.Now(),
6✔
2278
                Reason:             "ApplyPackage",
6✔
2279
                Message:            fmt.Sprintf("Applying package [%s:%s] to node [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name),
6✔
2280
        })
6✔
2281

6✔
2282
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] from [skyhook:%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetSkyhook().Name, stage)
6✔
2283
        r.recorder.Eventf(skyhookNode.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] to node [%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name, stage)
6✔
2284

6✔
2285
        skyhookNode.GetSkyhook().Updated = true
6✔
2286

6✔
2287
        return err
6✔
2288
}
2289

2290
// HandleRuntimeRequired finds any nodes for which all runtime required Skyhooks are complete and remove their runtime required taint
2291
// Will return an error if the patching of the nodes is not possible
2292
func (r *SkyhookReconciler) HandleRuntimeRequired(ctx context.Context, clusterState *clusterState) error {
6✔
2293
        node_to_skyhooks, skyhook_node_map := groupSkyhooksByNode(clusterState)
6✔
2294
        to_remove := getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks, skyhook_node_map)
6✔
2295
        // Remove the runtime required taint from nodes in to_remove
6✔
2296
        taint_to_remove := r.opts.GetRuntimeRequiredTaint()
6✔
2297
        errs := make([]error, 0)
6✔
2298
        for _, node := range to_remove {
10✔
2299
                // check before removing taint that it even exists to begin with
4✔
2300
                if !taints.TaintExists(node.Spec.Taints, &taint_to_remove) {
8✔
2301
                        continue
4✔
2302
                }
2303
                // RemoveTaint will ALWAYS return nil for its error so no need to check it
2304
                new_node, updated, _ := taints.RemoveTaint(node, &taint_to_remove)
4✔
2305
                if updated {
8✔
2306
                        err := r.Patch(ctx, new_node, client.MergeFrom(node))
4✔
2307
                        if err != nil {
4✔
2308
                                errs = append(errs, err)
×
2309
                        }
×
2310
                }
2311
        }
2312
        if len(errs) > 0 {
6✔
2313
                return utilerrors.NewAggregate(errs)
×
2314
        }
×
2315
        return nil
6✔
2316
}
2317

2318
// Group Skyhooks by what node they target
2319
func groupSkyhooksByNode(clusterState *clusterState) (map[types.UID][]SkyhookNodes, map[types.UID]*corev1.Node) {
7✔
2320
        node_to_skyhooks := make(map[types.UID][]SkyhookNodes)
7✔
2321
        nodes := make(map[types.UID]*corev1.Node)
7✔
2322
        for _, skyhook := range clusterState.skyhooks {
14✔
2323
                // Ignore skyhooks that don't have runtime required
7✔
2324
                if !skyhook.GetSkyhook().Spec.RuntimeRequired {
14✔
2325
                        continue
7✔
2326
                }
2327
                for _, node := range skyhook.GetNodes() {
10✔
2328
                        if _, ok := node_to_skyhooks[node.GetNode().UID]; !ok {
10✔
2329
                                node_to_skyhooks[node.GetNode().UID] = make([]SkyhookNodes, 0)
5✔
2330
                                nodes[node.GetNode().UID] = node.GetNode()
5✔
2331
                        }
5✔
2332
                        node_to_skyhooks[node.GetNode().UID] = append(node_to_skyhooks[node.GetNode().UID], skyhook)
5✔
2333
                }
2334

2335
        }
2336
        return node_to_skyhooks, nodes
7✔
2337
}
2338

2339
// Get the nodes to remove runtime required taint from node that all skyhooks targeting that node have completed
2340
// Note: This checks per-node completion, not skyhook-level completion. A node's taint is removed when all
2341
// runtime-required skyhooks are complete ON THAT SPECIFIC NODE, regardless of other nodes' completion status.
2342
func getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks map[types.UID][]SkyhookNodes, nodes map[types.UID]*corev1.Node) []*corev1.Node {
7✔
2343
        to_remove := make([]*corev1.Node, 0)
7✔
2344
        for node_uid, skyhooks := range node_to_skyhooks {
12✔
2345
                node := nodes[node_uid]
5✔
2346
                all_complete := true
5✔
2347
                for _, skyhook := range skyhooks {
10✔
2348
                        // Check if THIS specific node is complete for this skyhook (not all nodes)
5✔
2349
                        _, nodeWrapper := skyhook.GetNode(node.Name)
5✔
2350
                        if nodeWrapper == nil || !nodeWrapper.IsComplete() {
10✔
2351
                                all_complete = false
5✔
2352
                                break
5✔
2353
                        }
2354
                }
2355
                if all_complete {
10✔
2356
                        to_remove = append(to_remove, node)
5✔
2357
                }
5✔
2358
        }
2359
        return to_remove
7✔
2360
}
2361

2362
// HandleAutoTaint applies the runtime-required taint to new nodes matching runtime-required
2363
// Skyhooks that have AutoTaintNewNodes enabled.
2364
func (r *SkyhookReconciler) HandleAutoTaint(ctx context.Context, clusterState *clusterState) (bool, error) {
6✔
2365
        taint_to_add := r.opts.GetRuntimeRequiredTaint()
6✔
2366
        to_taint := clusterState.getAutoTaintNodes(taint_to_add)
6✔
2367
        errs := make([]error, 0)
6✔
2368
        changed := false
6✔
2369
        for _, node := range to_taint {
10✔
2370
                newNode, updated, _ := taints.AddOrUpdateTaint(node, &taint_to_add)
4✔
2371
                if !updated {
4✔
2372
                        continue
×
2373
                }
2374
                // add annotation to indicate that the node was auto-tainted
2375
                if newNode.Annotations == nil {
4✔
2376
                        newNode.Annotations = make(map[string]string)
×
2377
                }
×
2378
                newNode.Annotations[fmt.Sprintf("%s/autoTaint_%s", v1alpha1.METADATA_PREFIX, taint_to_add.Key)] = "true"
4✔
2379

4✔
2380
                if err := r.Patch(ctx, newNode, client.MergeFrom(node)); err != nil {
4✔
2381
                        errs = append(errs, err)
×
2382
                }
×
2383
                changed = true
4✔
2384
        }
2385
        if len(errs) > 0 {
6✔
2386
                return changed, utilerrors.NewAggregate(errs)
×
2387
        }
×
2388
        return changed, nil
6✔
2389
}
2390

2391
// setPodResources sets resources for all containers and init containers in the pod if override is set, else leaves empty for LimitRange
2392
func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) {
7✔
2393
        if res == nil {
14✔
2394
                return
7✔
2395
        }
7✔
2396
        if !res.CPURequest.IsZero() || !res.CPULimit.IsZero() || !res.MemoryRequest.IsZero() || !res.MemoryLimit.IsZero() {
10✔
2397
                for i := range pod.Spec.InitContainers {
10✔
2398
                        pod.Spec.InitContainers[i].Resources = corev1.ResourceRequirements{
5✔
2399
                                Limits: corev1.ResourceList{
5✔
2400
                                        corev1.ResourceCPU:    res.CPULimit,
5✔
2401
                                        corev1.ResourceMemory: res.MemoryLimit,
5✔
2402
                                },
5✔
2403
                                Requests: corev1.ResourceList{
5✔
2404
                                        corev1.ResourceCPU:    res.CPURequest,
5✔
2405
                                        corev1.ResourceMemory: res.MemoryRequest,
5✔
2406
                                },
5✔
2407
                        }
5✔
2408
                }
5✔
2409
        }
2410
}
2411

2412
// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies.
2413
func partitionNodesIntoCompartments(clusterState *clusterState) error {
7✔
2414
        for _, skyhook := range clusterState.skyhooks {
14✔
2415
                // Skip skyhooks without a deployment policy (they use the default compartment created in BuildState)
7✔
2416
                if skyhook.GetSkyhook().Spec.DeploymentPolicy == "" {
14✔
2417
                        continue
7✔
2418
                }
2419

2420
                // Skip if no compartments exist (e.g., deployment policy not found)
2421
                // The webhook should prevent this at admission time, and the controller sets a condition at runtime,
2422
                // but we guard here to prevent panics if the policy goes missing
2423
                if len(skyhook.GetCompartments()) == 0 {
4✔
2424
                        continue
1✔
2425
                }
2426

2427
                // Clear all compartments before reassigning nodes to prevent stale nodes
2428
                // This ensures nodes are only in their current compartment based on current labels
2429
                for _, compartment := range skyhook.GetCompartments() {
6✔
2430
                        compartment.ClearNodes()
3✔
2431
                }
3✔
2432

2433
                for _, node := range skyhook.GetNodes() {
6✔
2434
                        compartmentName, err := skyhook.AssignNodeToCompartment(node)
3✔
2435
                        if err != nil {
3✔
2436
                                return fmt.Errorf("error assigning node %s: %w", node.GetNode().Name, err)
×
2437
                        }
×
2438
                        if err := skyhook.AddCompartmentNode(compartmentName, node); err != nil {
3✔
2439
                                return fmt.Errorf("error adding node %s to compartment %s: %w", node.GetNode().Name, compartmentName, err)
×
2440
                        }
×
2441
                }
2442
        }
2443

2444
        return nil
7✔
2445
}
2446

2447
// validateAndUpsertSkyhookData performs validation and configmap operations for a skyhook
2448
func (r *SkyhookReconciler) validateAndUpsertSkyhookData(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, ctrl.Result, error) {
6✔
2449
        if yes, result, err := shouldReturn(r.ValidateRunningPackages(ctx, skyhook)); yes {
11✔
2450
                return yes, result, err
5✔
2451
        }
5✔
2452

2453
        if yes, result, err := shouldReturn(r.ValidateNodeConfigmaps(ctx, skyhook.GetSkyhook().Name, skyhook.GetNodes())); yes {
10✔
2454
                return yes, result, err
4✔
2455
        }
4✔
2456

2457
        if yes, result, err := shouldReturn(r.UpsertConfigmaps(ctx, skyhook, clusterState)); yes {
10✔
2458
                return yes, result, err
4✔
2459
        }
4✔
2460

2461
        return false, ctrl.Result{}, nil
6✔
2462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc