• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NVIDIA / skyhook / 21652001474

03 Feb 2026 11:33PM UTC coverage: 80.924% (-0.01%) from 80.936%
21652001474

push

github

lockwobr
feat(chart): add automatic Skyhook resource cleanup on helm uninstall

Add pre-delete hook to automatically clean up Skyhook and DeploymentPolicy
resources during helm uninstall, eliminating the manual step previously
required to avoid reinstall issues.

Enabled by default with configurable timeout (120s). Can be disabled with
cleanup.enabled=false.

6588 of 8141 relevant lines covered (80.92%)

4.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
/operator/internal/controller/skyhook_controller.go
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
 * SPDX-License-Identifier: Apache-2.0
4
 *
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
package controller
20

21
import (
22
        "cmp"
23
        "context"
24
        "crypto/sha256"
25
        "encoding/hex"
26
        "encoding/json"
27
        "errors"
28
        "fmt"
29
        "reflect"
30
        "slices"
31
        "sort"
32
        "strings"
33
        "time"
34

35
        "github.com/NVIDIA/skyhook/operator/api/v1alpha1"
36
        "github.com/NVIDIA/skyhook/operator/internal/dal"
37
        "github.com/NVIDIA/skyhook/operator/internal/version"
38
        "github.com/NVIDIA/skyhook/operator/internal/wrapper"
39
        "github.com/go-logr/logr"
40

41
        corev1 "k8s.io/api/core/v1"
42
        policyv1 "k8s.io/api/policy/v1"
43
        apierrors "k8s.io/apimachinery/pkg/api/errors"
44
        "k8s.io/apimachinery/pkg/api/resource"
45
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46
        "k8s.io/apimachinery/pkg/runtime"
47
        "k8s.io/apimachinery/pkg/types"
48
        utilerrors "k8s.io/apimachinery/pkg/util/errors"
49
        "k8s.io/client-go/tools/record"
50
        "k8s.io/kubernetes/pkg/util/taints"
51
        ctrl "sigs.k8s.io/controller-runtime"
52
        "sigs.k8s.io/controller-runtime/pkg/client"
53
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/log"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
)
58

59
const (
60
        EventsReasonSkyhookApply       = "Apply"
61
        EventsReasonSkyhookInterrupt   = "Interrupt"
62
        EventsReasonSkyhookDrain       = "Drain"
63
        EventsReasonSkyhookStateChange = "State"
64
        EventsReasonNodeReboot         = "Reboot"
65
        EventTypeNormal                = "Normal"
66
        // EventTypeWarning = "Warning"
67
        TaintUnschedulable     = corev1.TaintNodeUnschedulable
68
        InterruptContainerName = "interrupt"
69

70
        SkyhookFinalizer = "skyhook.nvidia.com/skyhook"
71
)
72

73
type SkyhookOperatorOptions struct {
74
        Namespace            string        `env:"NAMESPACE, default=skyhook"`
75
        MaxInterval          time.Duration `env:"DEFAULT_INTERVAL, default=10m"`
76
        ImagePullSecret      string        `env:"IMAGE_PULL_SECRET"`
77
        CopyDirRoot          string        `env:"COPY_DIR_ROOT, default=/var/lib/skyhook"`
78
        ReapplyOnReboot      bool          `env:"REAPPLY_ON_REBOOT, default=false"`
79
        RuntimeRequiredTaint string        `env:"RUNTIME_REQUIRED_TAINT, default=skyhook.nvidia.com=runtime-required:NoSchedule"`
80
        PauseImage           string        `env:"PAUSE_IMAGE, default=registry.k8s.io/pause:3.10"`
81
        AgentImage           string        `env:"AGENT_IMAGE, default=ghcr.io/nvidia/skyhook/agent:latest"` // TODO: this needs to be updated with a working default
82
        AgentLogRoot         string        `env:"AGENT_LOG_ROOT, default=/var/log/skyhook"`
83
}
84

85
func (o *SkyhookOperatorOptions) Validate() error {
8✔
86

8✔
87
        messages := make([]string, 0)
8✔
88
        if o.Namespace == "" {
8✔
89
                messages = append(messages, "namespace must be set")
×
90
        }
×
91
        if o.CopyDirRoot == "" {
8✔
92
                messages = append(messages, "copy dir root must be set")
×
93
        }
×
94
        if o.RuntimeRequiredTaint == "" {
8✔
95
                messages = append(messages, "runtime required taint must be set")
×
96
        }
×
97
        if o.MaxInterval < time.Minute {
9✔
98
                messages = append(messages, "max interval must be at least 1 minute")
1✔
99
        }
1✔
100

101
        // CopyDirRoot must start with /
102
        if !strings.HasPrefix(o.CopyDirRoot, "/") {
9✔
103
                messages = append(messages, "copy dir root must start with /")
1✔
104
        }
1✔
105

106
        // RuntimeRequiredTaint must be parsable and must not be a deletion
107
        _, delete, err := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
8✔
108
        if err != nil {
9✔
109
                messages = append(messages, fmt.Sprintf("runtime required taint is invalid: %s", err.Error()))
1✔
110
        }
1✔
111
        if len(delete) > 0 {
9✔
112
                messages = append(messages, "runtime required taint must not be a deletion")
1✔
113
        }
1✔
114

115
        if o.AgentImage == "" {
9✔
116
                messages = append(messages, "agent image must be set")
1✔
117
        }
1✔
118

119
        if !strings.Contains(o.AgentImage, ":") {
9✔
120
                messages = append(messages, "agent image must contain a tag")
1✔
121
        }
1✔
122

123
        if o.PauseImage == "" {
9✔
124
                messages = append(messages, "pause image must be set")
1✔
125
        }
1✔
126

127
        if !strings.Contains(o.PauseImage, ":") {
9✔
128
                messages = append(messages, "pause image must contain a tag")
1✔
129
        }
1✔
130

131
        if len(messages) > 0 {
9✔
132
                return errors.New(strings.Join(messages, ", "))
1✔
133
        }
1✔
134

135
        return nil
8✔
136
}
137

138
// AgentVersion returns the image tag portion of AgentImage
139
func (o *SkyhookOperatorOptions) AgentVersion() string {
8✔
140
        parts := strings.Split(o.AgentImage, ":")
8✔
141
        return parts[len(parts)-1]
8✔
142
}
8✔
143

144
func (o *SkyhookOperatorOptions) GetRuntimeRequiredTaint() corev1.Taint {
8✔
145
        to_add, _, _ := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
8✔
146
        return to_add[0]
8✔
147
}
8✔
148

149
func (o *SkyhookOperatorOptions) GetRuntimeRequiredToleration() corev1.Toleration {
8✔
150
        taint := o.GetRuntimeRequiredTaint()
8✔
151
        return corev1.Toleration{
8✔
152
                Key:      taint.Key,
8✔
153
                Operator: corev1.TolerationOpEqual,
8✔
154
                Value:    taint.Value,
8✔
155
                Effect:   taint.Effect,
8✔
156
        }
8✔
157
}
8✔
158

159
// force type checking against this interface
160
var _ reconcile.Reconciler = &SkyhookReconciler{}
161

162
func NewSkyhookReconciler(schema *runtime.Scheme, c client.Client, recorder record.EventRecorder, opts SkyhookOperatorOptions) (*SkyhookReconciler, error) {
8✔
163

8✔
164
        err := opts.Validate()
8✔
165
        if err != nil {
8✔
166
                return nil, fmt.Errorf("invalid skyhook operator options: %w", err)
×
167
        }
×
168

169
        return &SkyhookReconciler{
8✔
170
                Client:   c,
8✔
171
                scheme:   schema,
8✔
172
                recorder: recorder,
8✔
173
                opts:     opts,
8✔
174
                dal:      dal.New(c),
8✔
175
        }, nil
8✔
176
}
177

178
// SkyhookReconciler reconciles a Skyhook object
179
type SkyhookReconciler struct {
180
        client.Client
181
        scheme   *runtime.Scheme
182
        recorder record.EventRecorder
183
        opts     SkyhookOperatorOptions
184
        dal      dal.DAL
185
}
186

187
// SetupWithManager sets up the controller with the Manager.
188
func (r *SkyhookReconciler) SetupWithManager(mgr ctrl.Manager) error {
8✔
189

8✔
190
        // indexes allow for query on fields to use the local cache
8✔
191
        indexer := mgr.GetFieldIndexer()
8✔
192
        err := indexer.
8✔
193
                IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
15✔
194
                        pod, ok := o.(*corev1.Pod)
7✔
195
                        if !ok {
7✔
196
                                return nil
×
197
                        }
×
198
                        return []string{pod.Spec.NodeName}
7✔
199
                })
200

201
        if err != nil {
8✔
202
                return err
×
203
        }
×
204

205
        ehandler := &eventHandler{
8✔
206
                logger: mgr.GetLogger(),
8✔
207
                dal:    dal.New(r.Client),
8✔
208
        }
8✔
209

8✔
210
        return ctrl.NewControllerManagedBy(mgr).
8✔
211
                For(&v1alpha1.Skyhook{}).
8✔
212
                Watches(
8✔
213
                        &corev1.Pod{},
8✔
214
                        handler.EnqueueRequestsFromMapFunc(podHandlerFunc),
8✔
215
                ).
8✔
216
                Watches(
8✔
217
                        &corev1.Node{},
8✔
218
                        ehandler,
8✔
219
                ).
8✔
220
                Complete(r)
8✔
221
}
222

223
// CRD Permissions
224
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks,verbs=get;list;watch;create;update;patch;delete
225
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/status,verbs=get;update;patch
226
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/finalizers,verbs=update
227
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=deploymentpolicies,verbs=get;list;watch
228

229
// core permissions
230
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;update;patch;watch
231
//+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
232
//+kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create
233
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
234
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
235

236
// Reconcile is part of the main kubernetes reconciliation loop which aims to
237
// move the current state of the cluster closer to the desired state.
238
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile
239
func (r *SkyhookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
8✔
240
        logger := log.FromContext(ctx)
8✔
241
        // split off requests for pods
8✔
242
        if strings.HasPrefix(req.Name, "pod---") {
15✔
243
                name := strings.Split(req.Name, "pod---")[1]
7✔
244
                pod, err := r.dal.GetPod(ctx, req.Namespace, name)
7✔
245
                if err == nil && pod != nil { // if pod, then call other wise not a pod
14✔
246
                        return r.PodReconcile(ctx, pod)
7✔
247
                }
7✔
248
                return ctrl.Result{}, err
7✔
249
        }
250

251
        // get all skyhooks (SCR)
252
        skyhooks, err := r.dal.GetSkyhooks(ctx)
8✔
253
        if err != nil {
8✔
254
                // error, going to requeue and backoff
×
255
                logger.Error(err, "error getting skyhooks")
×
256
                return ctrl.Result{}, err
×
257
        }
×
258

259
        // if there are no skyhooks, so actually nothing to do, so don't requeue
260
        if skyhooks == nil || len(skyhooks.Items) == 0 {
16✔
261
                return ctrl.Result{}, nil
8✔
262
        }
8✔
263

264
        // get all nodes
265
        nodes, err := r.dal.GetNodes(ctx)
7✔
266
        if err != nil {
7✔
267
                // error, going to requeue and backoff
×
268
                logger.Error(err, "error getting nodes")
×
269
                return ctrl.Result{}, err
×
270
        }
×
271

272
        // if no nodes, well not work to do either
273
        if nodes == nil || len(nodes.Items) == 0 {
7✔
274
                // no nodes, so nothing to do
×
275
                return ctrl.Result{}, nil
×
276
        }
×
277

278
        // get all deployment policies
279
        deploymentPolicies, err := r.dal.GetDeploymentPolicies(ctx)
7✔
280
        if err != nil {
7✔
281
                logger.Error(err, "error getting deployment policies")
×
282
                return ctrl.Result{}, err
×
283
        }
×
284

285
        // TODO: this build state could error in a lot of ways, and I think we might want to move towards partial state
286
        // mean if we cant get on SCR state, great, process that one and error
287

288
        // BUILD cluster state from all skyhooks, and all nodes
289
        // this filters and pairs up nodes to skyhooks, also provides help methods for introspection and mutation
290
        clusterState, err := BuildState(skyhooks, nodes, deploymentPolicies)
7✔
291
        if err != nil {
7✔
292
                // error, going to requeue and backoff
×
293
                logger.Error(err, "error building cluster state")
×
294
                return ctrl.Result{}, err
×
295
        }
×
296

297
        if yes, result, err := shouldReturn(r.HandleMigrations(ctx, clusterState)); yes {
14✔
298
                return result, err
7✔
299
        }
7✔
300

301
        if yes, result, err := shouldReturn(r.TrackReboots(ctx, clusterState)); yes {
14✔
302
                return result, err
7✔
303
        }
7✔
304

305
        // node picker is for selecting nodes to do work, tries maintain a prior of nodes between SCRs
306
        nodePicker := NewNodePicker(logger, r.opts.GetRuntimeRequiredToleration())
7✔
307

7✔
308
        errs := make([]error, 0)
7✔
309
        var result *ctrl.Result
7✔
310

7✔
311
        for _, skyhook := range clusterState.skyhooks {
14✔
312
                if yes, result, err := shouldReturn(r.HandleFinalizer(ctx, skyhook)); yes {
14✔
313
                        return result, err
7✔
314
                }
7✔
315

316
                if yes, result, err := shouldReturn(r.ReportState(ctx, clusterState, skyhook)); yes {
14✔
317
                        return result, err
7✔
318
                }
7✔
319

320
                if skyhook.IsPaused() {
13✔
321
                        if yes, result, err := shouldReturn(r.UpdatePauseStatus(ctx, clusterState, skyhook)); yes {
12✔
322
                                return result, err
6✔
323
                        }
6✔
324
                        continue
6✔
325
                }
326

327
                if yes, result, err := r.validateAndUpsertSkyhookData(ctx, skyhook, clusterState); yes {
13✔
328
                        return result, err
6✔
329
                }
6✔
330

331
                changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
7✔
332
                if changed {
14✔
333
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
334
                        if len(errs) > 0 {
12✔
335
                                return ctrl.Result{RequeueAfter: time.Second * 2}, utilerrors.NewAggregate(errs)
5✔
336
                        }
5✔
337
                        return ctrl.Result{RequeueAfter: time.Second * 2}, nil
7✔
338
                }
339

340
                _, err := HandleVersionChange(skyhook)
7✔
341
                if err != nil {
7✔
342
                        return ctrl.Result{RequeueAfter: time.Second * 2}, fmt.Errorf("error getting packages to uninstall: %w", err)
×
343
                }
×
344
        }
345

346
        // Process all non-complete, non-disabled skyhooks (in priority order)
347
        // Each skyhook is processed only for nodes that are ready (all higher-priority skyhooks complete on that node)
348
        // This enables per-node priority ordering: nodes can progress independently
349
        result, err = r.processSkyhooksPerNode(ctx, clusterState, nodePicker, logger)
7✔
350
        if err != nil {
12✔
351
                errs = append(errs, err)
5✔
352
        }
5✔
353

354
        err = r.HandleRuntimeRequired(ctx, clusterState)
7✔
355
        if err != nil {
7✔
356
                errs = append(errs, err)
×
357
        }
×
358

359
        if len(errs) > 0 {
12✔
360
                err := utilerrors.NewAggregate(errs)
5✔
361
                return ctrl.Result{}, err
5✔
362
        }
5✔
363

364
        if result != nil {
14✔
365
                return *result, nil
7✔
366
        }
7✔
367

368
        // default happy retry after max
369
        return ctrl.Result{RequeueAfter: r.opts.MaxInterval}, nil
7✔
370
}
371

372
// processSkyhooksPerNode processes all skyhooks for nodes that are ready (per-node priority ordering).
373
// A node is ready for a skyhook if all higher-priority skyhooks are complete on that specific node.
374
func (r *SkyhookReconciler) processSkyhooksPerNode(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, logger logr.Logger) (*ctrl.Result, error) {
7✔
375
        var result *ctrl.Result
7✔
376
        var errs []error
7✔
377

7✔
378
        for _, skyhook := range clusterState.skyhooks {
14✔
379
                if skyhook.IsComplete() || skyhook.IsDisabled() || skyhook.IsPaused() {
14✔
380
                        continue
7✔
381
                }
382

383
                // Check if any nodes are ready for this skyhook
384
                if !hasReadyNodesForSkyhook(skyhook, clusterState.skyhooks) {
12✔
385
                        continue
5✔
386
                }
387

388
                res, err := r.RunSkyhookPackages(ctx, clusterState, nodePicker, skyhook)
7✔
389
                if err != nil {
12✔
390
                        logger.Error(err, "error processing skyhook", "skyhook", skyhook.GetSkyhook().Name)
5✔
391
                        errs = append(errs, err)
5✔
392
                }
5✔
393
                if res != nil {
14✔
394
                        result = res
7✔
395
                }
7✔
396
        }
397

398
        if len(errs) > 0 {
12✔
399
                return result, utilerrors.NewAggregate(errs)
5✔
400
        }
5✔
401
        return result, nil
7✔
402
}
403

404
// hasReadyNodesForSkyhook checks if any nodes are ready to process this skyhook.
405
// A node is ready if it's not complete and all higher-priority skyhooks are complete on that node.
406
func hasReadyNodesForSkyhook(skyhook SkyhookNodes, allSkyhooks []SkyhookNodes) bool {
7✔
407
        for _, node := range skyhook.GetNodes() {
14✔
408
                if !node.IsComplete() && IsNodeReadyForSkyhook(node.GetNode().Name, skyhook, allSkyhooks) {
14✔
409
                        return true
7✔
410
                }
7✔
411
        }
412
        return false
5✔
413
}
414

415
func shouldReturn(updates bool, err error) (bool, ctrl.Result, error) {
7✔
416
        if err != nil {
14✔
417
                return true, ctrl.Result{}, err
7✔
418
        }
7✔
419
        if updates {
14✔
420
                return true, ctrl.Result{RequeueAfter: time.Second * 2}, nil
7✔
421
        }
7✔
422
        return false, ctrl.Result{}, nil
7✔
423
}
424

425
func (r *SkyhookReconciler) HandleMigrations(ctx context.Context, clusterState *clusterState) (bool, error) {
7✔
426

7✔
427
        updates := false
7✔
428

7✔
429
        if version.VERSION == "" {
7✔
430
                // this means the binary was complied without version information
×
431
                return false, nil
×
432
        }
×
433

434
        logger := log.FromContext(ctx)
7✔
435
        errors := make([]error, 0)
7✔
436
        for _, skyhook := range clusterState.skyhooks {
14✔
437

7✔
438
                err := skyhook.Migrate(logger)
7✔
439
                if err != nil {
7✔
440
                        return false, fmt.Errorf("error migrating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
441
                }
×
442

443
                if err := skyhook.GetSkyhook().Skyhook.Validate(); err != nil {
7✔
444
                        return false, fmt.Errorf("error validating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
445
                }
×
446

447
                for _, node := range skyhook.GetNodes() {
14✔
448
                        if node.Changed() {
14✔
449
                                err := r.Status().Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
7✔
450
                                if err != nil {
7✔
451
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
452
                                }
×
453

454
                                err = r.Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
7✔
455
                                if err != nil {
7✔
456
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
457
                                }
×
458
                                updates = true
7✔
459
                        }
460
                }
461

462
                if skyhook.GetSkyhook().Updated {
14✔
463
                        // need to do this because SaveNodesAndSkyhook only saves skyhook status, not the main skyhook object where the annotations are
7✔
464
                        // additionally it needs to be an update, a patch nils out the annotations for some reason, which the save function does a patch
7✔
465

7✔
466
                        if err = r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
14✔
467
                                return false, fmt.Errorf("error updating during migration skyhook status [%s]: %w", skyhook.GetSkyhook().Name, err)
7✔
468
                        }
7✔
469

470
                        // because of conflict issues (409) we need to do things a bit differently here.
471
                        // We might be able to use server side apply in the future, but for now we need to do this
472
                        // https://kubernetes.io/docs/reference/using-api/server-side-apply/
473
                        // https://github.com/kubernetes-sigs/controller-runtime/issues/347
474

475
                        // work around for now is to grab a new copy of the object, and then patch it
476

477
                        newskyhook, err := r.dal.GetSkyhook(ctx, skyhook.GetSkyhook().Name)
7✔
478
                        if err != nil {
7✔
479
                                return false, fmt.Errorf("error getting skyhook to migrate [%s]: %w", skyhook.GetSkyhook().Name, err)
×
480
                        }
×
481
                        newPatch := client.MergeFrom(newskyhook.DeepCopy())
7✔
482

7✔
483
                        // set version
7✔
484
                        wrapper.NewSkyhookWrapper(newskyhook).SetVersion()
7✔
485

7✔
486
                        if err = r.Patch(ctx, newskyhook, newPatch); err != nil {
7✔
487
                                return false, fmt.Errorf("error updating during migration skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
488
                        }
×
489

490
                        updates = true
7✔
491
                }
492
        }
493

494
        if len(errors) > 0 {
7✔
495
                return false, utilerrors.NewAggregate(errors)
×
496
        }
×
497

498
        return updates, nil
7✔
499
}
500

501
// ReportState computes and puts important information into the skyhook status so that monitoring tools such as k9s
502
// can see the information at a glance. For example, the number of completed nodes and the list of packages in the skyhook.
503
func (r *SkyhookReconciler) ReportState(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
7✔
504

7✔
505
        // save updated state to skyhook status
7✔
506
        skyhook.ReportState()
7✔
507

7✔
508
        if skyhook.GetSkyhook().Updated {
14✔
509
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
510
                if len(errs) > 0 {
7✔
511
                        return false, utilerrors.NewAggregate(errs)
×
512
                }
×
513
                return true, nil
7✔
514
        }
515

516
        return false, nil
7✔
517
}
518

519
func (r *SkyhookReconciler) UpdatePauseStatus(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
6✔
520
        changed := UpdateSkyhookPauseStatus(skyhook)
6✔
521

6✔
522
        if changed {
12✔
523
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
6✔
524
                if len(errs) > 0 {
6✔
525
                        return false, utilerrors.NewAggregate(errs)
×
526
                }
×
527
                return true, nil
6✔
528
        }
529

530
        return false, nil
6✔
531
}
532

533
func (r *SkyhookReconciler) TrackReboots(ctx context.Context, clusterState *clusterState) (bool, error) {
7✔
534

7✔
535
        updates := false
7✔
536
        errs := make([]error, 0)
7✔
537

7✔
538
        for _, skyhook := range clusterState.skyhooks {
14✔
539
                if skyhook.GetSkyhook().Status.NodeBootIds == nil {
14✔
540
                        skyhook.GetSkyhook().Status.NodeBootIds = make(map[string]string)
7✔
541
                }
7✔
542

543
                for _, node := range skyhook.GetNodes() {
14✔
544
                        id, ok := skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name]
7✔
545

7✔
546
                        if !ok { // new node
14✔
547
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
7✔
548
                                skyhook.GetSkyhook().Updated = true
7✔
549
                        }
7✔
550

551
                        if id != "" && id != node.GetNode().Status.NodeInfo.BootID { // node rebooted
7✔
552
                                if r.opts.ReapplyOnReboot {
×
553
                                        r.recorder.Eventf(skyhook.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node [%s] to be reapplied", node.GetNode().Name)
×
554
                                        r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node for [%s] to be reapplied", node.GetSkyhook().Name)
×
555
                                        node.Reset()
×
556
                                }
×
557
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
×
558
                                skyhook.GetSkyhook().Updated = true
×
559
                        }
560

561
                        if node.Changed() { // update
7✔
562
                                updates = true
×
563
                                err := r.Update(ctx, node.GetNode())
×
564
                                if err != nil {
×
565
                                        errs = append(errs, fmt.Errorf("error updating node after reboot [%s]: %w", node.GetNode().Name, err))
×
566
                                }
×
567
                        }
568
                }
569
                if skyhook.GetSkyhook().Updated { // update
14✔
570
                        updates = true
7✔
571
                        err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook)
7✔
572
                        if err != nil {
13✔
573
                                errs = append(errs, fmt.Errorf("error updating skyhook status after reboot [%s]: %w", skyhook.GetSkyhook().Name, err))
6✔
574
                        }
6✔
575
                }
576
        }
577

578
        return updates, utilerrors.NewAggregate(errs)
7✔
579
}
580

581
// RunSkyhookPackages runs all skyhook packages then saves and requeues if changes were made
582
func (r *SkyhookReconciler) RunSkyhookPackages(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, skyhook SkyhookNodes) (*ctrl.Result, error) {
7✔
583

7✔
584
        logger := log.FromContext(ctx)
7✔
585
        requeue := false
7✔
586

7✔
587
        toUninstall, err := HandleVersionChange(skyhook)
7✔
588
        if err != nil {
7✔
589
                return nil, fmt.Errorf("error getting packages to uninstall: %w", err)
×
590
        }
×
591

592
        changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
7✔
593
        if !changed && skyhook.IsComplete() {
7✔
594
                return nil, nil
×
595
        }
×
596

597
        selectedNode := nodePicker.SelectNodes(skyhook)
7✔
598

7✔
599
        for _, node := range selectedNode {
14✔
600
                // Skip nodes that are waiting on higher-priority skyhooks
7✔
601
                // This enables per-node priority ordering
7✔
602
                if !IsNodeReadyForSkyhook(node.GetNode().Name, skyhook, clusterState.skyhooks) {
12✔
603
                        continue
5✔
604
                }
605

606
                if node.IsComplete() && !node.Changed() {
7✔
607
                        continue
×
608
                }
609

610
                toRun, err := node.RunNext()
7✔
611
                if err != nil {
7✔
612
                        return nil, fmt.Errorf("error getting next packages to run: %w", err)
×
613
                }
×
614

615
                // prepend the uninstall packages so they are ran first
616
                toRun = append(toUninstall, toRun...)
7✔
617

7✔
618
                interrupt, pack := fudgeInterruptWithPriority(toRun, skyhook.GetSkyhook().GetConfigUpdates(), skyhook.GetSkyhook().GetConfigInterrupts())
7✔
619

7✔
620
                for _, f := range toRun {
14✔
621

7✔
622
                        ok, err := r.ProcessInterrupt(ctx, node, f, interrupt, interrupt != nil && f.Name == pack)
7✔
623
                        if err != nil {
7✔
624
                                // TODO: error handle
×
625
                                return nil, fmt.Errorf("error processing if we should interrupt [%s:%s]: %w", f.Name, f.Version, err)
×
626
                        }
×
627
                        if !ok {
12✔
628
                                requeue = true
5✔
629
                                continue
5✔
630
                        }
631

632
                        err = r.ApplyPackage(ctx, logger, clusterState, node, f, interrupt != nil && f.Name == pack)
7✔
633
                        if err != nil {
7✔
634
                                return nil, fmt.Errorf("error applying package [%s:%s]: %w", f.Name, f.Version, err)
×
635
                        }
×
636

637
                        // process one package at a time
638
                        if skyhook.GetSkyhook().Spec.Serial {
7✔
639
                                return &ctrl.Result{Requeue: true}, nil
×
640
                        }
×
641
                }
642
        }
643

644
        saved, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
645
        if len(errs) > 0 {
12✔
646
                return &ctrl.Result{}, utilerrors.NewAggregate(errs)
5✔
647
        }
5✔
648
        if saved {
14✔
649
                requeue = true
7✔
650
        }
7✔
651

652
        if !skyhook.IsComplete() || requeue {
14✔
653
                return &ctrl.Result{RequeueAfter: time.Second * 2}, nil // not sure this is better then just requeue bool
7✔
654
        }
7✔
655

656
        return nil, utilerrors.NewAggregate(errs)
×
657
}
658

659
// SaveNodesAndSkyhook saves nodes and skyhook and will update the events if the skyhook status changes
660
func (r *SkyhookReconciler) SaveNodesAndSkyhook(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, []error) {
7✔
661
        saved := false
7✔
662
        errs := make([]error, 0)
7✔
663

7✔
664
        for _, node := range skyhook.GetNodes() {
14✔
665
                patch := client.StrategicMergeFrom(clusterState.tracker.GetOriginal(node.GetNode()))
7✔
666
                if node.Changed() {
14✔
667
                        err := r.Patch(ctx, node.GetNode(), patch)
7✔
668
                        if err != nil {
7✔
669
                                errs = append(errs, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
670
                        }
×
671
                        saved = true
7✔
672

7✔
673
                        err = r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhook.GetSkyhook(), node.GetNode())
7✔
674
                        if err != nil {
7✔
675
                                errs = append(errs, fmt.Errorf("error upserting labels, annotations, and packages config map for node [%s]: %w", node.GetNode().Name, err))
×
676
                        }
×
677

678
                        if node.IsComplete() {
14✔
679
                                r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook [%s] complete.", skyhook.GetSkyhook().Name)
7✔
680

7✔
681
                                // since node is complete remove from priority
7✔
682
                                if _, ok := skyhook.GetSkyhook().Status.NodePriority[node.GetNode().Name]; ok {
14✔
683
                                        delete(skyhook.GetSkyhook().Status.NodePriority, node.GetNode().Name)
7✔
684
                                        skyhook.GetSkyhook().Updated = true
7✔
685
                                }
7✔
686
                        }
687
                }
688

689
                // updates node's condition
690
                node.UpdateCondition()
7✔
691
                if node.Changed() {
14✔
692
                        // conditions are in status
7✔
693
                        err := r.Status().Patch(ctx, node.GetNode(), patch)
7✔
694
                        if err != nil {
12✔
695
                                errs = append(errs, fmt.Errorf("error patching node status [%s]: %w", node.GetNode().Name, err))
5✔
696
                        }
5✔
697
                        saved = true
7✔
698
                }
699

700
                if node.GetSkyhook() != nil && node.GetSkyhook().Updated {
14✔
701
                        skyhook.GetSkyhook().Updated = true
7✔
702
                }
7✔
703
        }
704

705
        if skyhook.GetSkyhook().Updated {
14✔
706
                patch := client.MergeFrom(clusterState.tracker.GetOriginal(skyhook.GetSkyhook().Skyhook))
7✔
707
                err := r.Status().Patch(ctx, skyhook.GetSkyhook().Skyhook, patch)
7✔
708
                if err != nil {
7✔
709
                        errs = append(errs, err)
×
710
                }
×
711
                saved = true
7✔
712

7✔
713
                if skyhook.GetPriorStatus() != "" && skyhook.GetPriorStatus() != skyhook.Status() {
14✔
714
                        // we transitioned, fire event
7✔
715
                        r.recorder.Eventf(skyhook.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook transitioned [%s] -> [%s]", skyhook.GetPriorStatus(), skyhook.Status())
7✔
716
                }
7✔
717
        }
718

719
        if len(errs) > 0 {
12✔
720
                saved = false
5✔
721
        }
5✔
722
        return saved, errs
7✔
723
}
724

725
// HandleVersionChange updates the state for the node or skyhook if a version is changed on a package
726
func HandleVersionChange(skyhook SkyhookNodes) ([]*v1alpha1.Package, error) {
7✔
727
        toUninstall := make([]*v1alpha1.Package, 0)
7✔
728

7✔
729
        for _, node := range skyhook.GetNodes() {
14✔
730
                nodeState, err := node.State()
7✔
731
                if err != nil {
7✔
732
                        return nil, err
×
733
                }
×
734

735
                for _, packageStatus := range nodeState {
14✔
736
                        upgrade := false
7✔
737

7✔
738
                        _package, exists := skyhook.GetSkyhook().Spec.Packages[packageStatus.Name]
7✔
739
                        if exists && _package.Version == packageStatus.Version {
14✔
740
                                continue // no uninstall needed for package
7✔
741
                        }
742

743
                        packageStatusRef := v1alpha1.PackageRef{
5✔
744
                                Name:    packageStatus.Name,
5✔
745
                                Version: packageStatus.Version,
5✔
746
                        }
5✔
747

5✔
748
                        if !exists && packageStatus.Stage != v1alpha1.StageUninstall {
10✔
749
                                // Start uninstall of old package
5✔
750
                                err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
5✔
751
                                if err != nil {
5✔
752
                                        return nil, fmt.Errorf("error updating node status: %w", err)
×
753
                                }
×
754
                        } else if exists && _package.Version != packageStatus.Version {
10✔
755
                                comparison := version.Compare(_package.Version, packageStatus.Version)
5✔
756
                                if comparison == -2 {
5✔
757
                                        return nil, errors.New("error comparing package versions: invalid version string provided enabling webhooks validates versions before being applied")
×
758
                                }
×
759

760
                                if comparison == 1 {
10✔
761
                                        _packageStatus, found := node.PackageStatus(_package.GetUniqueName())
5✔
762
                                        if found && _packageStatus.Stage == v1alpha1.StageUpgrade {
10✔
763
                                                continue
5✔
764
                                        }
765

766
                                        // start upgrade of package
767
                                        err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageUpgrade, 0, _package.ContainerSHA)
5✔
768
                                        if err != nil {
5✔
769
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
770
                                        }
×
771

772
                                        upgrade = true
5✔
773
                                } else if comparison == -1 && packageStatus.Stage != v1alpha1.StageUninstall {
10✔
774
                                        // Start uninstall of old package
5✔
775
                                        err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
5✔
776
                                        if err != nil {
5✔
777
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
778
                                        }
×
779

780
                                        // If version changed then update new version to wait
781
                                        err = node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, v1alpha1.StageUninstall, 0, _package.ContainerSHA)
5✔
782
                                        if err != nil {
5✔
783
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
784
                                        }
×
785
                                }
786
                        }
787

788
                        // only need to create a feaux package for uninstall since it won't be in the DAG (Upgrade will)
789
                        newPackageStatus, found := node.PackageStatus(packageStatusRef.GetUniqueName())
5✔
790
                        if !upgrade && found && newPackageStatus.Stage == v1alpha1.StageUninstall && newPackageStatus.State == v1alpha1.StateInProgress {
10✔
791
                                // create fake package with the info we can salvage from the node state
5✔
792
                                newPackage := &v1alpha1.Package{
5✔
793
                                        PackageRef: packageStatusRef,
5✔
794
                                        Image:      packageStatus.Image,
5✔
795
                                }
5✔
796

5✔
797
                                // Add package to uninstall list if it's not already present
5✔
798
                                found := false
5✔
799
                                for _, uninstallPackage := range toUninstall {
10✔
800
                                        if reflect.DeepEqual(uninstallPackage, newPackage) {
5✔
801
                                                found = true
×
802
                                        }
×
803
                                }
804

805
                                if !found {
10✔
806
                                        toUninstall = append(toUninstall, newPackage)
5✔
807
                                }
5✔
808
                        }
809

810
                        // remove all config updates for the package since it's being uninstalled or
811
                        // upgraded. NOTE: The config updates must be removed whenever the version changes
812
                        // or else the package interrupt may be skipped if there is one
813
                        skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
5✔
814

5✔
815
                        // set the node and skyhook status to in progress
5✔
816
                        node.SetStatus(v1alpha1.StatusInProgress)
5✔
817
                }
818
        }
819

820
        return toUninstall, nil
7✔
821
}
822

823
// helper for get a point to a ref
824
func ptr[E any](e E) *E {
8✔
825
        return &e
8✔
826
}
8✔
827

828
// generateSafeName generates a consistent name for Kubernetes resources that is unique
829
// while staying within the specified character limit
830
func generateSafeName(maxLen int, nameParts ...string) string {
8✔
831
        name := strings.Join(nameParts, "-")
8✔
832
        // Replace dots with dashes as they're not allowed in resource names
8✔
833
        name = strings.ReplaceAll(name, ".", "-")
8✔
834

8✔
835
        unique := sha256.Sum256([]byte(name))
8✔
836
        uniqueStr := hex.EncodeToString(unique[:])[:8]
8✔
837

8✔
838
        maxlen := maxLen - len(uniqueStr) - 1
8✔
839
        if len(name) > maxlen {
15✔
840
                name = name[:maxlen]
7✔
841
        }
7✔
842

843
        return strings.ToLower(fmt.Sprintf("%s-%s", name, uniqueStr))
8✔
844
}
845

846
func (r *SkyhookReconciler) UpsertNodeLabelsAnnotationsPackages(ctx context.Context, skyhook *wrapper.Skyhook, node *corev1.Node) error {
8✔
847
        // No work to do if there is no labels or annotations for node
8✔
848
        if len(node.Labels) == 0 && len(node.Annotations) == 0 {
8✔
849
                return nil
×
850
        }
×
851

852
        annotations, err := json.Marshal(node.Annotations)
8✔
853
        if err != nil {
8✔
854
                return fmt.Errorf("error converting annotations into byte array: %w", err)
×
855
        }
×
856

857
        labels, err := json.Marshal(node.Labels)
8✔
858
        if err != nil {
8✔
859
                return fmt.Errorf("error converting labels into byte array: %w", err)
×
860
        }
×
861

862
        // marshal intermediary package metadata for the agent
863
        metadata := NewSkyhookMetadata(r.opts, skyhook)
8✔
864
        packages, err := metadata.Marshal()
8✔
865
        if err != nil {
8✔
866
                return fmt.Errorf("error converting packages into byte array: %w", err)
×
867
        }
×
868

869
        configMapName := generateSafeName(253, skyhook.Name, node.Name, "metadata")
8✔
870
        newCM := &corev1.ConfigMap{
8✔
871
                ObjectMeta: metav1.ObjectMeta{
8✔
872
                        Name:      configMapName,
8✔
873
                        Namespace: r.opts.Namespace,
8✔
874
                        Labels: map[string]string{
8✔
875
                                fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhook.Name,
8✔
876
                        },
8✔
877
                        Annotations: map[string]string{
8✔
878
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
8✔
879
                                fmt.Sprintf("%s/Node.name", v1alpha1.METADATA_PREFIX): node.Name,
8✔
880
                        },
8✔
881
                },
8✔
882
                Data: map[string]string{
8✔
883
                        "annotations.json": string(annotations),
8✔
884
                        "labels.json":      string(labels),
8✔
885
                        "packages.json":    string(packages),
8✔
886
                },
8✔
887
        }
8✔
888

8✔
889
        if err := ctrl.SetControllerReference(skyhook.Skyhook, newCM, r.scheme); err != nil {
8✔
890
                return fmt.Errorf("error setting ownership: %w", err)
×
891
        }
×
892

893
        existingConfigMap := &corev1.ConfigMap{}
8✔
894
        err = r.Get(ctx, client.ObjectKey{Namespace: r.opts.Namespace, Name: configMapName}, existingConfigMap)
8✔
895
        if err != nil {
16✔
896
                if apierrors.IsNotFound(err) {
16✔
897
                        // create
8✔
898
                        err := r.Create(ctx, newCM)
8✔
899
                        if err != nil {
8✔
900
                                return fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
901
                        }
×
902
                } else {
×
903
                        return fmt.Errorf("error getting config map: %w", err)
×
904
                }
×
905
        } else {
7✔
906
                if !reflect.DeepEqual(existingConfigMap.Data, newCM.Data) {
14✔
907
                        // update
7✔
908
                        err := r.Update(ctx, newCM)
7✔
909
                        if err != nil {
7✔
910
                                return fmt.Errorf("error updating config map [%s]: %w", newCM.Name, err)
×
911
                        }
×
912
                }
913
        }
914

915
        return nil
8✔
916
}
917

918
// HandleConfigUpdates checks whether the configMap on a package was updated and if it was the configmap will
919
// be updated and the package will be put into config mode if the package is complete or erroring
920
func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes, _package v1alpha1.Package, oldConfigMap, newConfigMap *corev1.ConfigMap) (bool, error) {
6✔
921
        completedNodes, nodeCount := 0, len(skyhook.GetNodes())
6✔
922
        erroringNode := false
6✔
923

6✔
924
        // if configmap changed
6✔
925
        if !reflect.DeepEqual(oldConfigMap.Data, newConfigMap.Data) {
11✔
926
                for _, node := range skyhook.GetNodes() {
10✔
927
                        exists, err := r.PodExists(ctx, node.GetNode().Name, skyhook.GetSkyhook().Name, &_package)
5✔
928
                        if err != nil {
5✔
929
                                return false, err
×
930
                        }
×
931

932
                        if !exists && node.IsPackageComplete(_package) {
10✔
933
                                completedNodes++
5✔
934
                        }
5✔
935

936
                        // if we have an erroring node in the config, interrupt, or post-interrupt mode
937
                        // then we will restart the config changes
938
                        if packageStatus, found := node.PackageStatus(_package.GetUniqueName()); found {
10✔
939
                                switch packageStatus.Stage {
5✔
940
                                case v1alpha1.StageConfig, v1alpha1.StageInterrupt, v1alpha1.StagePostInterrupt:
5✔
941
                                        if packageStatus.State == v1alpha1.StateErroring {
5✔
942
                                                erroringNode = true
×
943

×
944
                                                // delete the erroring pod from the node so that it can be recreated
×
945
                                                // with the updated configmap
×
946
                                                pods, err := r.dal.GetPods(ctx,
×
947
                                                        client.MatchingFields{
×
948
                                                                "spec.nodeName": node.GetNode().Name,
×
949
                                                        },
×
950
                                                        client.MatchingLabels{
×
951
                                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.GetSkyhook().Name,
×
952
                                                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
×
953
                                                        },
×
954
                                                )
×
955
                                                if err != nil {
×
956
                                                        return false, err
×
957
                                                }
×
958

959
                                                if pods != nil {
×
960
                                                        for _, pod := range pods.Items {
×
961
                                                                err := r.Delete(ctx, &pod)
×
962
                                                                if err != nil {
×
963
                                                                        return false, err
×
964
                                                                }
×
965
                                                        }
966
                                                }
967
                                        }
968
                                }
969
                        }
970
                }
971

972
                // if the update is complete or there is an erroring node put the package back into
973
                // the config mode and update the config map
974
                if completedNodes == nodeCount || erroringNode {
10✔
975
                        // get the keys in the configmap that changed
5✔
976
                        newConfigUpdates := make([]string, 0)
5✔
977
                        for key, new_val := range newConfigMap.Data {
10✔
978
                                if old_val, exists := oldConfigMap.Data[key]; !exists || old_val != new_val {
10✔
979
                                        newConfigUpdates = append(newConfigUpdates, key)
5✔
980
                                }
5✔
981
                        }
982

983
                        // if updates completed then clear out old config updates as they are finished
984
                        if completedNodes == nodeCount {
10✔
985
                                skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
5✔
986
                        }
5✔
987

988
                        // Add the new changed keys to the config updates
989
                        skyhook.GetSkyhook().AddConfigUpdates(_package.Name, newConfigUpdates...)
5✔
990

5✔
991
                        for _, node := range skyhook.GetNodes() {
10✔
992
                                err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageConfig, 0, _package.ContainerSHA)
5✔
993
                                if err != nil {
5✔
994
                                        return false, fmt.Errorf("error upserting node status [%s]: %w", node.GetNode().Name, err)
×
995
                                }
×
996

997
                                node.SetStatus(v1alpha1.StatusInProgress)
5✔
998
                        }
999

1000
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
5✔
1001
                        if len(errs) > 0 {
5✔
1002
                                return false, utilerrors.NewAggregate(errs)
×
1003
                        }
×
1004

1005
                        // update config map
1006
                        err := r.Update(ctx, newConfigMap)
5✔
1007
                        if err != nil {
5✔
1008
                                return false, fmt.Errorf("error updating config map [%s]: %w", newConfigMap.Name, err)
×
1009
                        }
×
1010

1011
                        return true, nil
5✔
1012
                }
1013
        }
1014

1015
        return false, nil
6✔
1016
}
1017

1018
func (r *SkyhookReconciler) UpsertConfigmaps(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, error) {
7✔
1019
        updated := false
7✔
1020

7✔
1021
        var list corev1.ConfigMapList
7✔
1022
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name})
7✔
1023
        if err != nil {
7✔
1024
                return false, fmt.Errorf("error listing config maps while upserting: %w", err)
×
1025
        }
×
1026

1027
        existingCMs := make(map[string]corev1.ConfigMap)
7✔
1028
        for _, cm := range list.Items {
13✔
1029
                existingCMs[cm.Name] = cm
6✔
1030
        }
6✔
1031

1032
        // clean up from an update
1033
        shouldExist := make(map[string]struct{})
7✔
1034
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
14✔
1035
                shouldExist[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))] = struct{}{}
7✔
1036
        }
7✔
1037

1038
        for k, v := range existingCMs {
13✔
1039
                if _, ok := shouldExist[k]; !ok {
11✔
1040
                        // delete
5✔
1041
                        err := r.Delete(ctx, &v)
5✔
1042
                        if err != nil {
5✔
1043
                                return false, fmt.Errorf("error deleting existing config map [%s] while upserting: %w", v.Name, err)
×
1044
                        }
×
1045
                }
1046
        }
1047

1048
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
14✔
1049
                if len(_package.ConfigMap) > 0 {
13✔
1050

6✔
1051
                        newCM := &corev1.ConfigMap{
6✔
1052
                                ObjectMeta: metav1.ObjectMeta{
6✔
1053
                                        Name:      strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version)),
6✔
1054
                                        Namespace: r.opts.Namespace,
6✔
1055
                                        Labels: map[string]string{
6✔
1056
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
6✔
1057
                                        },
6✔
1058
                                        Annotations: map[string]string{
6✔
1059
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):            skyhook.GetSkyhook().Name,
6✔
1060
                                                fmt.Sprintf("%s/Package.Name", v1alpha1.METADATA_PREFIX):    _package.Name,
6✔
1061
                                                fmt.Sprintf("%s/Package.Version", v1alpha1.METADATA_PREFIX): _package.Version,
6✔
1062
                                        },
6✔
1063
                                },
6✔
1064
                                Data: _package.ConfigMap,
6✔
1065
                        }
6✔
1066
                        // set owner of CM to the SCR, which will clean up the CM in delete of the SCR
6✔
1067
                        if err := ctrl.SetControllerReference(skyhook.GetSkyhook().Skyhook, newCM, r.scheme); err != nil {
6✔
1068
                                return false, fmt.Errorf("error setting ownership of cm: %w", err)
×
1069
                        }
×
1070

1071
                        if existingCM, ok := existingCMs[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))]; ok {
12✔
1072
                                updatedConfigMap, err := r.HandleConfigUpdates(ctx, clusterState, skyhook, _package, &existingCM, newCM)
6✔
1073
                                if err != nil {
6✔
1074
                                        return false, fmt.Errorf("error updating config map [%s]: %s", newCM.Name, err)
×
1075
                                }
×
1076
                                if updatedConfigMap {
11✔
1077
                                        updated = true
5✔
1078
                                }
5✔
1079
                        } else {
6✔
1080
                                // create
6✔
1081
                                err := r.Create(ctx, newCM)
6✔
1082
                                if err != nil {
6✔
1083
                                        return false, fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
1084
                                }
×
1085
                        }
1086
                }
1087
        }
1088

1089
        return updated, nil
7✔
1090
}
1091

1092
func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1093

5✔
1094
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
5✔
1095
                "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1096
        })
5✔
1097
        if err != nil {
5✔
1098
                return false, err
×
1099
        }
×
1100

1101
        if pods == nil || len(pods.Items) == 0 {
5✔
1102
                return true, nil
×
1103
        }
×
1104

1105
        // checking for any running or pending pods with no toleration to unschedulable
1106
        // if its has an unschedulable toleration we can ignore
1107
        for _, pod := range pods.Items {
10✔
1108

5✔
1109
                if ShouldEvict(&pod) {
10✔
1110
                        return false, nil
5✔
1111
                }
5✔
1112

1113
        }
1114

1115
        return true, nil
5✔
1116
}
1117

1118
func ShouldEvict(pod *corev1.Pod) bool {
5✔
1119
        switch pod.Status.Phase {
5✔
1120
        case corev1.PodRunning, corev1.PodPending:
5✔
1121

5✔
1122
                for _, taint := range pod.Spec.Tolerations {
10✔
1123
                        switch taint.Key {
5✔
1124
                        case "node.kubernetes.io/unschedulable": // ignoring
5✔
1125
                                return false
5✔
1126
                        }
1127
                }
1128

1129
                if len(pod.ObjectMeta.OwnerReferences) > 1 {
5✔
1130
                        for _, owner := range pod.ObjectMeta.OwnerReferences {
×
1131
                                if owner.Kind == "DaemonSet" { // ignoring
×
1132
                                        return false
×
1133
                                }
×
1134
                        }
1135
                }
1136

1137
                if pod.GetNamespace() == "kube-system" {
5✔
1138
                        return false
×
1139
                }
×
1140

1141
                return true
5✔
1142
        }
1143
        return false
5✔
1144
}
1145

1146
// HandleFinalizer returns true only if we container is deleted and we handled it completely, else false
1147
func (r *SkyhookReconciler) HandleFinalizer(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
7✔
1148
        if skyhook.GetSkyhook().DeletionTimestamp.IsZero() { // if not deleted, and does not have our finalizer, add it
14✔
1149
                if !controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
14✔
1150
                        controllerutil.AddFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
7✔
1151

7✔
1152
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
7✔
1153
                                return false, fmt.Errorf("error updating skyhook to add finalizer: %w", err)
×
1154
                        }
×
1155
                }
1156
        } else { // being delete, time to handle our
7✔
1157
                if controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
14✔
1158

7✔
1159
                        errs := make([]error, 0)
7✔
1160

7✔
1161
                        // zero out all the metrics related to this skyhook both skyhook and packages
7✔
1162
                        zeroOutSkyhookMetrics(skyhook)
7✔
1163

7✔
1164
                        for _, node := range skyhook.GetNodes() {
13✔
1165
                                patch := client.StrategicMergeFrom(node.GetNode().DeepCopy())
6✔
1166

6✔
1167
                                node.Uncordon()
6✔
1168

6✔
1169
                                // if this doesn't change the node then don't patch
6✔
1170
                                if !node.Changed() {
12✔
1171
                                        continue
6✔
1172
                                }
1173

1174
                                err := r.Patch(ctx, node.GetNode(), patch)
4✔
1175
                                if err != nil {
4✔
1176
                                        errs = append(errs, fmt.Errorf("error patching node [%s] in finalizer: %w", node.GetNode().Name, err))
×
1177
                                }
×
1178
                        }
1179

1180
                        if len(errs) > 0 { // we errored, so we need to return error, otherwise we would release the skyhook when we didnt finish
7✔
1181
                                return false, utilerrors.NewAggregate(errs)
×
1182
                        }
×
1183

1184
                        controllerutil.RemoveFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
7✔
1185
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
10✔
1186
                                return false, fmt.Errorf("error updating skyhook removing finalizer: %w", err)
3✔
1187
                        }
3✔
1188
                        // should be 1, and now 2. we want to set ObservedGeneration up to not trigger an logic from this update adding the finalizer
1189
                        skyhook.GetSkyhook().Status.ObservedGeneration = skyhook.GetSkyhook().Status.ObservedGeneration + 1
7✔
1190

7✔
1191
                        if err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
14✔
1192
                                return false, fmt.Errorf("error updating skyhook status: %w", err)
7✔
1193
                        }
7✔
1194

1195
                        return true, nil
×
1196
                }
1197
        }
1198
        return false, nil
7✔
1199
}
1200

1201
// HasNonInterruptWork returns true if pods are running on the node that are either packages, or matches the SCR selector
1202
func (r *SkyhookReconciler) HasNonInterruptWork(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1203

5✔
1204
        selector, err := metav1.LabelSelectorAsSelector(&skyhookNode.GetSkyhook().Spec.PodNonInterruptLabels)
5✔
1205
        if err != nil {
5✔
1206
                return false, fmt.Errorf("error creating selector: %w", err)
×
1207
        }
×
1208

1209
        if selector.Empty() { // when selector is empty it does not do any selecting, ie will return all pods on node.
10✔
1210
                return false, nil
5✔
1211
        }
5✔
1212

1213
        pods, err := r.dal.GetPods(ctx,
5✔
1214
                client.MatchingLabelsSelector{Selector: selector},
5✔
1215
                client.MatchingFields{
5✔
1216
                        "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1217
                },
5✔
1218
        )
5✔
1219
        if err != nil {
5✔
1220
                return false, fmt.Errorf("error getting pods: %w", err)
×
1221
        }
×
1222

1223
        if pods == nil || len(pods.Items) == 0 {
10✔
1224
                return false, nil
5✔
1225
        }
5✔
1226

1227
        for _, pod := range pods.Items {
10✔
1228
                switch pod.Status.Phase {
5✔
1229
                case corev1.PodRunning, corev1.PodPending:
5✔
1230
                        return true, nil
5✔
1231
                }
1232
        }
1233

1234
        return false, nil
1✔
1235
}
1236

1237
func (r *SkyhookReconciler) HasRunningPackages(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1238
        pods, err := r.dal.GetPods(ctx,
5✔
1239
                client.HasLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX)},
5✔
1240
                client.MatchingFields{
5✔
1241
                        "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1242
                },
5✔
1243
        )
5✔
1244
        if err != nil {
5✔
1245
                return false, fmt.Errorf("error getting pods: %w", err)
×
1246
        }
×
1247

1248
        return pods != nil && len(pods.Items) > 0, nil
5✔
1249
}
1250

1251
func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
5✔
1252
        drained, err := r.IsDrained(ctx, skyhookNode)
5✔
1253
        if err != nil {
5✔
1254
                return false, err
×
1255
        }
×
1256
        if drained {
10✔
1257
                return true, nil
5✔
1258
        }
5✔
1259

1260
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
5✔
1261
                "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1262
        })
5✔
1263
        if err != nil {
5✔
1264
                return false, err
×
1265
        }
×
1266

1267
        if pods == nil || len(pods.Items) == 0 {
5✔
1268
                return true, nil
×
1269
        }
×
1270

1271
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookInterrupt,
5✔
1272
                "draining node [%s] package [%s:%s] from [skyhook:%s]",
5✔
1273
                skyhookNode.GetNode().Name,
5✔
1274
                _package.Name,
5✔
1275
                _package.Version,
5✔
1276
                skyhookNode.GetSkyhook().Name,
5✔
1277
        )
5✔
1278

5✔
1279
        errs := make([]error, 0)
5✔
1280
        for _, pod := range pods.Items {
10✔
1281

5✔
1282
                if ShouldEvict(&pod) {
10✔
1283
                        eviction := policyv1.Eviction{}
5✔
1284
                        err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction)
5✔
1285
                        if err != nil {
5✔
1286
                                errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err))
×
1287
                        }
×
1288
                }
1289
        }
1290

1291
        return len(errs) == 0, utilerrors.NewAggregate(errs)
5✔
1292
}
1293

1294
// Interrupt should not be called unless safe to do so, IE already cordoned and drained
1295
func (r *SkyhookReconciler) Interrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, _interrupt *v1alpha1.Interrupt) error {
5✔
1296

5✔
1297
        hasPackagesRunning, err := r.HasRunningPackages(ctx, skyhookNode)
5✔
1298
        if err != nil {
5✔
1299
                return err
×
1300
        }
×
1301

1302
        if hasPackagesRunning { // keep waiting...
10✔
1303
                return nil
5✔
1304
        }
5✔
1305

1306
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
5✔
1307
        if err != nil {
5✔
1308
                return err
×
1309
        }
×
1310
        if exists {
5✔
1311
                // nothing to do here, already running
×
1312
                return nil
×
1313
        }
×
1314

1315
        argEncode, err := _interrupt.ToArgs()
5✔
1316
        if err != nil {
5✔
1317
                return fmt.Errorf("error creating interrupt args: %w", err)
×
1318
        }
×
1319

1320
        pod := createInterruptPodForPackage(r.opts, _interrupt, argEncode, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name)
5✔
1321

5✔
1322
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, v1alpha1.StageInterrupt, _package); err != nil {
5✔
1323
                return fmt.Errorf("error setting package on interrupt: %w", err)
×
1324
        }
×
1325

1326
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
5✔
1327
                return fmt.Errorf("error setting ownership: %w", err)
×
1328
        }
×
1329

1330
        if err := r.Create(ctx, pod); err != nil {
5✔
1331
                return fmt.Errorf("error creating interruption pod: %w", err)
×
1332
        }
×
1333

1334
        _ = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageInterrupt, 0, _package.ContainerSHA)
5✔
1335

5✔
1336
        r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonSkyhookInterrupt,
5✔
1337
                "Interrupting node [%s] package [%s:%s] from [skyhook:%s]",
5✔
1338
                skyhookNode.GetNode().Name,
5✔
1339
                _package.Name,
5✔
1340
                _package.Version,
5✔
1341
                skyhookNode.GetSkyhook().Name)
5✔
1342

5✔
1343
        return nil
5✔
1344
}
1345

1346
// fudgeInterruptWithPriority takes a list of packages, interrupts, and configUpdates and returns the correct merged interrupt to run to handle all the packages
1347
func fudgeInterruptWithPriority(next []*v1alpha1.Package, configUpdates map[string][]string, interrupts map[string][]*v1alpha1.Interrupt) (*v1alpha1.Interrupt, string) {
8✔
1348
        var ret *v1alpha1.Interrupt
8✔
1349
        var pack string
8✔
1350

8✔
1351
        // map interrupt to priority
8✔
1352
        // A lower priority value means a higher priority and will be used in favor of anything with a higher value
8✔
1353
        var priorities = map[v1alpha1.InterruptType]int{
8✔
1354
                v1alpha1.REBOOT:               0,
8✔
1355
                v1alpha1.RESTART_ALL_SERVICES: 1,
8✔
1356
                v1alpha1.SERVICE:              2,
8✔
1357
                v1alpha1.NOOP:                 3,
8✔
1358
        }
8✔
1359

8✔
1360
        for _, _package := range next {
16✔
1361

8✔
1362
                if len(configUpdates[_package.Name]) == 0 {
16✔
1363
                        interrupts[_package.Name] = []*v1alpha1.Interrupt{}
8✔
1364
                        if _package.HasInterrupt() {
14✔
1365
                                interrupts[_package.Name] = append(interrupts[_package.Name], _package.Interrupt)
6✔
1366
                        }
6✔
1367
                }
1368
        }
1369

1370
        packageNames := make([]string, 0)
8✔
1371
        for _, pkg := range next {
16✔
1372
                packageNames = append(packageNames, pkg.Name)
8✔
1373
        }
8✔
1374
        sort.Strings(packageNames)
8✔
1375

8✔
1376
        for _, _package := range packageNames {
16✔
1377
                _interrupts, ok := interrupts[_package]
8✔
1378
                if !ok {
14✔
1379
                        continue
6✔
1380
                }
1381

1382
                for _, interrupt := range _interrupts {
14✔
1383
                        if ret == nil { // prime ret, base case
12✔
1384
                                ret = interrupt
6✔
1385
                                pack = _package
6✔
1386
                        }
6✔
1387

1388
                        // short circuit, reboot has highest priority
1389
                        switch interrupt.Type {
6✔
1390
                        case v1alpha1.REBOOT:
6✔
1391
                                return interrupt, _package
6✔
1392
                        }
1393

1394
                        // check if interrupt is higher priority using the priority_order
1395
                        // A lower priority value means a higher priority
1396
                        if priorities[interrupt.Type] < priorities[ret.Type] {
7✔
1397
                                ret = interrupt
1✔
1398
                                pack = _package
1✔
1399
                        } else if priorities[interrupt.Type] == priorities[ret.Type] {
13✔
1400
                                mergeInterrupt(ret, interrupt)
6✔
1401
                        }
6✔
1402
                }
1403
        }
1404

1405
        return ret, pack // return merged interrupt and package
8✔
1406
}
1407

1408
func mergeInterrupt(left, right *v1alpha1.Interrupt) {
6✔
1409

6✔
1410
        // make sure both are of type service
6✔
1411
        if left.Type != v1alpha1.SERVICE || right.Type != v1alpha1.SERVICE {
7✔
1412
                return
1✔
1413
        }
1✔
1414

1415
        left.Services = merge(left.Services, right.Services)
6✔
1416
}
1417

1418
func merge[T cmp.Ordered](left, right []T) []T {
6✔
1419
        for _, r := range right {
12✔
1420
                if !slices.Contains(left, r) {
12✔
1421
                        left = append(left, r)
6✔
1422
                }
6✔
1423
        }
1424
        slices.Sort(left)
6✔
1425
        return left
6✔
1426
}
1427

1428
// ValidateNodeConfigmaps validates that there are no orphaned or stale config maps for a node
1429
func (r *SkyhookReconciler) ValidateNodeConfigmaps(ctx context.Context, skyhookName string, nodes []wrapper.SkyhookNode) (bool, error) {
7✔
1430
        var list corev1.ConfigMapList
7✔
1431
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhookName})
7✔
1432
        if err != nil {
7✔
1433
                return false, fmt.Errorf("error listing config maps: %w", err)
×
1434
        }
×
1435

1436
        // No configmaps created by this skyhook, no work needs to be done
1437
        if len(list.Items) == 0 {
14✔
1438
                return false, nil
7✔
1439
        }
7✔
1440

1441
        existingCMs := make(map[string]corev1.ConfigMap)
7✔
1442
        for _, cm := range list.Items {
14✔
1443
                existingCMs[cm.Name] = cm
7✔
1444
        }
7✔
1445

1446
        shouldExist := make(map[string]struct{})
7✔
1447
        for _, node := range nodes {
14✔
1448
                shouldExist[generateSafeName(253, skyhookName, node.GetNode().Name, "metadata")] = struct{}{}
7✔
1449
        }
7✔
1450

1451
        update := false
7✔
1452
        errs := make([]error, 0)
7✔
1453
        for k, v := range existingCMs {
14✔
1454
                if _, ok := shouldExist[k]; !ok {
7✔
1455
                        update = true
×
1456
                        err := r.Delete(ctx, &v)
×
1457
                        if err != nil {
×
1458
                                errs = append(errs, fmt.Errorf("error deleting existing config map [%s]: %w", v.Name, err))
×
1459
                        }
×
1460
                }
1461
        }
1462

1463
        // Ensure packages.json is present and up-to-date for expected configmaps
1464
        skyhookCR, err := r.dal.GetSkyhook(ctx, skyhookName)
7✔
1465
        if err != nil {
7✔
1466
                return update, fmt.Errorf("error getting skyhook for metadata validation: %w", err)
×
1467
        }
×
1468
        skyhookWrapper := wrapper.NewSkyhookWrapper(skyhookCR)
7✔
1469
        metadata := NewSkyhookMetadata(r.opts, skyhookWrapper)
7✔
1470
        expectedBytes, err := metadata.Marshal()
7✔
1471
        if err != nil {
7✔
1472
                return update, fmt.Errorf("error marshalling metadata for validation: %w", err)
×
1473
        }
×
1474
        expected := string(expectedBytes)
7✔
1475

7✔
1476
        for i := range list.Items {
14✔
1477
                cm := &list.Items[i]
7✔
1478
                if _, ok := shouldExist[cm.Name]; !ok {
7✔
1479
                        continue
×
1480
                }
1481
                if cm.Data == nil {
7✔
1482
                        cm.Data = make(map[string]string)
×
1483
                }
×
1484
                if cm.Data["packages.json"] != expected {
12✔
1485
                        cm.Data["packages.json"] = expected
5✔
1486
                        if err := r.Update(ctx, cm); err != nil {
5✔
1487
                                errs = append(errs, fmt.Errorf("error updating packages.json on config map [%s]: %w", cm.Name, err))
×
1488
                        } else {
5✔
1489
                                update = true
5✔
1490
                        }
5✔
1491
                }
1492
        }
1493

1494
        return update, utilerrors.NewAggregate(errs)
7✔
1495
}
1496

1497
// PodExists tests if this package is exists on a node.
1498
func (r *SkyhookReconciler) PodExists(ctx context.Context, nodeName, skyhookName string, _package *v1alpha1.Package) (bool, error) {
7✔
1499

7✔
1500
        pods, err := r.dal.GetPods(ctx,
7✔
1501
                client.MatchingFields{
7✔
1502
                        "spec.nodeName": nodeName,
7✔
1503
                },
7✔
1504
                client.MatchingLabels{
7✔
1505
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhookName,
7✔
1506
                        fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
7✔
1507
                },
7✔
1508
        )
7✔
1509
        if err != nil {
7✔
1510
                return false, fmt.Errorf("error check from existing pods: %w", err)
×
1511
        }
×
1512

1513
        if pods == nil || len(pods.Items) == 0 {
14✔
1514
                return false, nil
7✔
1515
        }
7✔
1516
        return true, nil
7✔
1517
}
1518

1519
// createInterruptPodForPackage returns the pod spec for an interrupt pod given an package
1520
func createInterruptPodForPackage(opts SkyhookOperatorOptions, _interrupt *v1alpha1.Interrupt, argEncode string, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string) *corev1.Pod {
6✔
1521
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
6✔
1522
                opts.CopyDirRoot,
6✔
1523
                skyhook.Name,
6✔
1524
                _package.Name,
6✔
1525
                _package.Version,
6✔
1526
                skyhook.UID,
6✔
1527
                skyhook.Generation,
6✔
1528
        )
6✔
1529

6✔
1530
        volumes := []corev1.Volume{
6✔
1531
                {
6✔
1532
                        Name: "root-mount",
6✔
1533
                        VolumeSource: corev1.VolumeSource{
6✔
1534
                                HostPath: &corev1.HostPathVolumeSource{
6✔
1535
                                        Path: "/",
6✔
1536
                                },
6✔
1537
                        },
6✔
1538
                },
6✔
1539
                {
6✔
1540
                        // node names in different CSPs might include dots which isn't allowed in volume names
6✔
1541
                        // so we have to replace all dots with dashes
6✔
1542
                        Name: generateSafeName(63, skyhook.Name, nodeName, "metadata"),
6✔
1543
                        VolumeSource: corev1.VolumeSource{
6✔
1544
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
1545
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
1546
                                                Name: strings.ReplaceAll(fmt.Sprintf("%s-%s-metadata", skyhook.Name, nodeName), ".", "-"),
6✔
1547
                                        },
6✔
1548
                                },
6✔
1549
                        },
6✔
1550
                },
6✔
1551
        }
6✔
1552
        volumeMounts := []corev1.VolumeMount{
6✔
1553
                {
6✔
1554
                        Name:             "root-mount",
6✔
1555
                        MountPath:        "/root",
6✔
1556
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
6✔
1557
                },
6✔
1558
        }
6✔
1559

6✔
1560
        pod := &corev1.Pod{
6✔
1561
                ObjectMeta: metav1.ObjectMeta{
6✔
1562
                        Name:      generateSafeName(63, skyhook.Name, "interrupt", string(_interrupt.Type), nodeName),
6✔
1563
                        Namespace: opts.Namespace,
6✔
1564
                        Labels: map[string]string{
6✔
1565
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
6✔
1566
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX):   fmt.Sprintf("%s-%s", _package.Name, _package.Version),
6✔
1567
                                fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX): "True",
6✔
1568
                        },
6✔
1569
                },
6✔
1570
                Spec: corev1.PodSpec{
6✔
1571
                        NodeName:      nodeName,
6✔
1572
                        RestartPolicy: corev1.RestartPolicyOnFailure,
6✔
1573
                        InitContainers: []corev1.Container{
6✔
1574
                                {
6✔
1575
                                        Name:  InterruptContainerName,
6✔
1576
                                        Image: getAgentImage(opts, _package),
6✔
1577
                                        Args:  []string{"interrupt", "/root", copyDir, argEncode},
6✔
1578
                                        Env:   getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name),
6✔
1579
                                        SecurityContext: &corev1.SecurityContext{
6✔
1580
                                                Privileged: ptr(true),
6✔
1581
                                        },
6✔
1582
                                        VolumeMounts: volumeMounts,
6✔
1583
                                        Resources: corev1.ResourceRequirements{
6✔
1584
                                                Limits: corev1.ResourceList{
6✔
1585
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
6✔
1586
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
6✔
1587
                                                },
6✔
1588
                                                Requests: corev1.ResourceList{
6✔
1589
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
6✔
1590
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
6✔
1591
                                                },
6✔
1592
                                        },
6✔
1593
                                },
6✔
1594
                        },
6✔
1595
                        Containers: []corev1.Container{
6✔
1596
                                {
6✔
1597
                                        Name:  "pause",
6✔
1598
                                        Image: opts.PauseImage,
6✔
1599
                                        Resources: corev1.ResourceRequirements{
6✔
1600
                                                Limits: corev1.ResourceList{
6✔
1601
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
6✔
1602
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
6✔
1603
                                                },
6✔
1604
                                                Requests: corev1.ResourceList{
6✔
1605
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
6✔
1606
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
6✔
1607
                                                },
6✔
1608
                                        },
6✔
1609
                                },
6✔
1610
                        },
6✔
1611
                        HostPID:     true,
6✔
1612
                        HostNetwork: true,
6✔
1613
                        // If you change these go change the SelectNode toleration in cluster_state.go
6✔
1614
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
6✔
1615
                                {
6✔
1616
                                        Key:      TaintUnschedulable,
6✔
1617
                                        Operator: corev1.TolerationOpExists,
6✔
1618
                                },
6✔
1619
                                opts.GetRuntimeRequiredToleration(),
6✔
1620
                        }, skyhook.Spec.AdditionalTolerations...),
6✔
1621
                        Volumes: volumes,
6✔
1622
                },
6✔
1623
        }
6✔
1624
        if opts.ImagePullSecret != "" {
6✔
1625
                pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
×
1626
                        {
×
1627
                                Name: opts.ImagePullSecret,
×
1628
                        },
×
1629
                }
×
1630
        }
×
1631
        return pod
6✔
1632
}
1633

1634
func trunstr(str string, length int) string {
8✔
1635
        if len(str) > length {
8✔
1636
                return str[:length]
×
1637
        }
×
1638
        return str
8✔
1639
}
1640

1641
func getAgentImage(opts SkyhookOperatorOptions, _package *v1alpha1.Package) string {
8✔
1642
        if _package.AgentImageOverride != "" {
13✔
1643
                return _package.AgentImageOverride
5✔
1644
        }
5✔
1645
        return opts.AgentImage
8✔
1646
}
1647

1648
// getPackageImage returns the full image reference for a package, using the digest if specified
1649
func getPackageImage(_package *v1alpha1.Package) string {
8✔
1650
        if _package.ContainerSHA != "" {
13✔
1651
                // When containerSHA is specified, use it instead of the version tag for immutable image reference
5✔
1652
                return fmt.Sprintf("%s@%s", _package.Image, _package.ContainerSHA)
5✔
1653
        }
5✔
1654
        // Fall back to version tag
1655
        return fmt.Sprintf("%s:%s", _package.Image, _package.Version)
8✔
1656
}
1657

1658
func getAgentConfigEnvVars(opts SkyhookOperatorOptions, packageName string, packageVersion string, resourceID string, skyhookName string) []corev1.EnvVar {
8✔
1659
        return []corev1.EnvVar{
8✔
1660
                {
8✔
1661
                        Name:  "SKYHOOK_LOG_DIR",
8✔
1662
                        Value: fmt.Sprintf("%s/%s", opts.AgentLogRoot, skyhookName),
8✔
1663
                },
8✔
1664
                {
8✔
1665
                        Name:  "SKYHOOK_ROOT_DIR",
8✔
1666
                        Value: fmt.Sprintf("%s/%s", opts.CopyDirRoot, skyhookName),
8✔
1667
                },
8✔
1668
                {
8✔
1669
                        Name:  "COPY_RESOLV",
8✔
1670
                        Value: "false",
8✔
1671
                },
8✔
1672
                {
8✔
1673
                        Name:  "SKYHOOK_RESOURCE_ID",
8✔
1674
                        Value: fmt.Sprintf("%s_%s_%s", resourceID, packageName, packageVersion),
8✔
1675
                },
8✔
1676
        }
8✔
1677
}
8✔
1678

1679
// createPodFromPackage creates a pod spec for a skyhook pod for a given package
1680
func createPodFromPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string, stage v1alpha1.Stage) *corev1.Pod {
8✔
1681
        // Generate consistent names that won't exceed k8s limits
8✔
1682
        volumeName := generateSafeName(63, "metadata", nodeName)
8✔
1683
        configMapName := generateSafeName(253, skyhook.Name, nodeName, "metadata")
8✔
1684

8✔
1685
        volumes := []corev1.Volume{
8✔
1686
                {
8✔
1687
                        Name: "root-mount",
8✔
1688
                        VolumeSource: corev1.VolumeSource{
8✔
1689
                                HostPath: &corev1.HostPathVolumeSource{
8✔
1690
                                        Path: "/",
8✔
1691
                                },
8✔
1692
                        },
8✔
1693
                },
8✔
1694
                {
8✔
1695
                        Name: volumeName,
8✔
1696
                        VolumeSource: corev1.VolumeSource{
8✔
1697
                                ConfigMap: &corev1.ConfigMapVolumeSource{
8✔
1698
                                        LocalObjectReference: corev1.LocalObjectReference{
8✔
1699
                                                Name: configMapName,
8✔
1700
                                        },
8✔
1701
                                },
8✔
1702
                        },
8✔
1703
                },
8✔
1704
        }
8✔
1705

8✔
1706
        volumeMounts := []corev1.VolumeMount{
8✔
1707
                {
8✔
1708
                        Name:             "root-mount",
8✔
1709
                        MountPath:        "/root",
8✔
1710
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
8✔
1711
                },
8✔
1712
                {
8✔
1713
                        Name:      volumeName,
8✔
1714
                        MountPath: "/skyhook-package/node-metadata",
8✔
1715
                },
8✔
1716
        }
8✔
1717

8✔
1718
        if len(_package.ConfigMap) > 0 {
14✔
1719
                volumeMounts = append(volumeMounts, corev1.VolumeMount{
6✔
1720
                        Name:      _package.Name,
6✔
1721
                        MountPath: "/skyhook-package/configmaps",
6✔
1722
                })
6✔
1723

6✔
1724
                volumes = append(volumes, corev1.Volume{
6✔
1725
                        Name: _package.Name,
6✔
1726
                        VolumeSource: corev1.VolumeSource{
6✔
1727
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
1728
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
1729
                                                Name: strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.Name, _package.Name, _package.Version)),
6✔
1730
                                        },
6✔
1731
                                },
6✔
1732
                        },
6✔
1733
                })
6✔
1734
        }
6✔
1735

1736
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
8✔
1737
                opts.CopyDirRoot,
8✔
1738
                skyhook.Name,
8✔
1739
                _package.Name,
8✔
1740
                _package.Version,
8✔
1741
                skyhook.UID,
8✔
1742
                skyhook.Generation,
8✔
1743
        )
8✔
1744
        applyargs := []string{strings.ToLower(string(stage)), "/root", copyDir}
8✔
1745
        checkargs := []string{strings.ToLower(string(stage) + "-check"), "/root", copyDir}
8✔
1746

8✔
1747
        agentEnvs := append(
8✔
1748
                _package.Env,
8✔
1749
                getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name)...,
8✔
1750
        )
8✔
1751

8✔
1752
        pod := &corev1.Pod{
8✔
1753
                ObjectMeta: metav1.ObjectMeta{
8✔
1754
                        Name:      generateSafeName(63, skyhook.Name, _package.Name, _package.Version, string(stage), nodeName),
8✔
1755
                        Namespace: opts.Namespace,
8✔
1756
                        Labels: map[string]string{
8✔
1757
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.Name,
8✔
1758
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
8✔
1759
                        },
8✔
1760
                },
8✔
1761
                Spec: corev1.PodSpec{
8✔
1762
                        NodeName:      nodeName,
8✔
1763
                        RestartPolicy: corev1.RestartPolicyOnFailure,
8✔
1764
                        InitContainers: []corev1.Container{
8✔
1765
                                {
8✔
1766
                                        Name:            fmt.Sprintf("%s-init", trunstr(_package.Name, 43)),
8✔
1767
                                        Image:           getPackageImage(_package),
8✔
1768
                                        ImagePullPolicy: "Always",
8✔
1769
                                        Command:         []string{"/bin/sh"},
8✔
1770
                                        Args: []string{
8✔
1771
                                                "-c",
8✔
1772
                                                "mkdir -p /root/${SKYHOOK_DIR} && cp -r /skyhook-package/* /root/${SKYHOOK_DIR}",
8✔
1773
                                        },
8✔
1774
                                        Env: []corev1.EnvVar{
8✔
1775
                                                {
8✔
1776
                                                        Name:  "SKYHOOK_DIR",
8✔
1777
                                                        Value: copyDir,
8✔
1778
                                                },
8✔
1779
                                        },
8✔
1780
                                        SecurityContext: &corev1.SecurityContext{
8✔
1781
                                                Privileged: ptr(true),
8✔
1782
                                        },
8✔
1783
                                        VolumeMounts: volumeMounts,
8✔
1784
                                },
8✔
1785
                                {
8✔
1786
                                        Name:            fmt.Sprintf("%s-%s", trunstr(_package.Name, 43), stage),
8✔
1787
                                        Image:           getAgentImage(opts, _package),
8✔
1788
                                        ImagePullPolicy: "Always",
8✔
1789
                                        Args:            applyargs,
8✔
1790
                                        Env:             agentEnvs,
8✔
1791
                                        SecurityContext: &corev1.SecurityContext{
8✔
1792
                                                Privileged: ptr(true),
8✔
1793
                                        },
8✔
1794
                                        VolumeMounts: volumeMounts,
8✔
1795
                                },
8✔
1796
                                {
8✔
1797
                                        Name:            fmt.Sprintf("%s-%scheck", trunstr(_package.Name, 43), stage),
8✔
1798
                                        Image:           getAgentImage(opts, _package),
8✔
1799
                                        ImagePullPolicy: "Always",
8✔
1800
                                        Args:            checkargs,
8✔
1801
                                        Env:             agentEnvs,
8✔
1802
                                        SecurityContext: &corev1.SecurityContext{
8✔
1803
                                                Privileged: ptr(true),
8✔
1804
                                        },
8✔
1805
                                        VolumeMounts: volumeMounts,
8✔
1806
                                },
8✔
1807
                        },
8✔
1808
                        Containers: []corev1.Container{
8✔
1809
                                {
8✔
1810
                                        Name:  "pause",
8✔
1811
                                        Image: opts.PauseImage,
8✔
1812
                                        Resources: corev1.ResourceRequirements{
8✔
1813
                                                Limits: corev1.ResourceList{
8✔
1814
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
8✔
1815
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
8✔
1816
                                                },
8✔
1817
                                                Requests: corev1.ResourceList{
8✔
1818
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
8✔
1819
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
8✔
1820
                                                },
8✔
1821
                                        },
8✔
1822
                                },
8✔
1823
                        },
8✔
1824
                        Volumes:     volumes,
8✔
1825
                        HostPID:     true,
8✔
1826
                        HostNetwork: true,
8✔
1827
                        // If you change these go change the SelectNode toleration in cluster_state.go
8✔
1828
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
8✔
1829
                                {
8✔
1830
                                        Key:      TaintUnschedulable,
8✔
1831
                                        Operator: corev1.TolerationOpExists,
8✔
1832
                                },
8✔
1833
                                opts.GetRuntimeRequiredToleration(),
8✔
1834
                        }, skyhook.Spec.AdditionalTolerations...),
8✔
1835
                },
8✔
1836
        }
8✔
1837
        if opts.ImagePullSecret != "" {
8✔
1838
                pod.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
×
1839
                        {
×
1840
                                Name: opts.ImagePullSecret,
×
1841
                        },
×
1842
                }
×
1843
        }
×
1844
        if _package.GracefulShutdown != nil {
13✔
1845
                pod.Spec.TerminationGracePeriodSeconds = ptr(int64(_package.GracefulShutdown.Duration.Seconds()))
5✔
1846
        }
5✔
1847
        setPodResources(pod, _package.Resources)
8✔
1848
        return pod
8✔
1849
}
1850

1851
// FilterEnv removes the environment variables passed into exlude
1852
func FilterEnv(envs []corev1.EnvVar, exclude ...string) []corev1.EnvVar {
8✔
1853
        var filteredEnv []corev1.EnvVar
8✔
1854

8✔
1855
        // build map of exclude strings for faster lookup
8✔
1856
        excludeMap := make(map[string]struct{})
8✔
1857
        for _, name := range exclude {
16✔
1858
                excludeMap[name] = struct{}{}
8✔
1859
        }
8✔
1860

1861
        // If the environment variable name is in the exclude list, skip it
1862
        // otherwise append it to the final list
1863
        for _, env := range envs {
16✔
1864
                if _, found := excludeMap[env.Name]; !found {
16✔
1865
                        filteredEnv = append(filteredEnv, env)
8✔
1866
                }
8✔
1867
        }
1868

1869
        return filteredEnv
8✔
1870
}
1871

1872
// PodMatchesPackage asserts that a given pod matches the given pod spec
1873
func podMatchesPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, pod corev1.Pod, skyhook *wrapper.Skyhook, stage v1alpha1.Stage) bool {
8✔
1874
        var expectedPod *corev1.Pod
8✔
1875

8✔
1876
        // need to differentiate whether the pod is for an interrupt or not so we know
8✔
1877
        // what to expect and how to compare them
8✔
1878
        isInterrupt := false
8✔
1879
        _, limitRange := pod.Annotations["kubernetes.io/limit-ranger"]
8✔
1880

8✔
1881
        if pod.Labels[fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX)] == "True" {
14✔
1882
                expectedPod = createInterruptPodForPackage(opts, &v1alpha1.Interrupt{}, "", _package, skyhook, "")
6✔
1883
                isInterrupt = true
6✔
1884
        } else {
14✔
1885
                expectedPod = createPodFromPackage(opts, _package, skyhook, "", stage)
8✔
1886
        }
8✔
1887

1888
        actualPod := pod.DeepCopy()
8✔
1889

8✔
1890
        // check to see whether the name or the version of the package changed
8✔
1891
        packageLabel := fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX)
8✔
1892
        if actualPod.Labels[packageLabel] != expectedPod.Labels[packageLabel] {
14✔
1893
                return false
6✔
1894
        }
6✔
1895

1896
        // compare initContainers since this is where a lot of the important info lives
1897
        for i := range actualPod.Spec.InitContainers {
16✔
1898
                expectedContainer := expectedPod.Spec.InitContainers[i]
8✔
1899
                actualContainer := actualPod.Spec.InitContainers[i]
8✔
1900

8✔
1901
                if expectedContainer.Name != actualContainer.Name {
9✔
1902
                        return false
1✔
1903
                }
1✔
1904

1905
                if expectedContainer.Image != actualContainer.Image {
8✔
1906
                        return false
×
1907
                }
×
1908

1909
                // compare the containers env vars except for the ones that are inserted
1910
                // by the operator by default as the SKYHOOK_RESOURCE_ID will change every
1911
                // time the skyhook is updated and would cause every pod to be removed
1912
                // TODO: This is ignoring all the static env vars that are set by operator config.
1913
                // It probably should be just SKYHOOK_RESOURCE_ID that is ignored. Otherwise,
1914
                // a user will have to manually delete the pod to update the package when operator is updated.
1915
                dummyAgentEnv := getAgentConfigEnvVars(opts, "", "", "", "")
8✔
1916
                excludedEnvs := make([]string, len(dummyAgentEnv))
8✔
1917
                for i, env := range dummyAgentEnv {
16✔
1918
                        excludedEnvs[i] = env.Name
8✔
1919
                }
8✔
1920
                expectedFilteredEnv := FilterEnv(expectedContainer.Env, excludedEnvs...)
8✔
1921
                actualFilteredEnv := FilterEnv(actualContainer.Env, excludedEnvs...)
8✔
1922
                if !reflect.DeepEqual(expectedFilteredEnv, actualFilteredEnv) {
14✔
1923
                        return false
6✔
1924
                }
6✔
1925

1926
                if !isInterrupt { // dont compare these since they are not configured on interrupt
16✔
1927
                        // compare resource requests and limits (CPU, memory, etc.)
8✔
1928
                        expectedResources := expectedContainer.Resources
8✔
1929
                        actualResources := actualContainer.Resources
8✔
1930
                        if skyhook.Spec.Packages[_package.Name].Resources != nil {
14✔
1931
                                // If CR has resources specified, they should match exactly
6✔
1932
                                if !reflect.DeepEqual(expectedResources, actualResources) {
7✔
1933
                                        return false
1✔
1934
                                }
1✔
1935
                        } else {
8✔
1936
                                // If CR has no resources specified, ensure pod has no resource overrides
8✔
1937
                                if !limitRange {
16✔
1938
                                        if actualResources.Requests != nil || actualResources.Limits != nil {
9✔
1939
                                                return false
1✔
1940
                                        }
1✔
1941
                                }
1942
                        }
1943
                }
1944
        }
1945

1946
        return true
8✔
1947
}
1948

1949
// ValidateRunningPackages deletes pods that don't match the current spec and checks if there are pods running
1950
// that don't match the node state and removes them if they exist
1951
func (r *SkyhookReconciler) ValidateRunningPackages(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
7✔
1952

7✔
1953
        update := false
7✔
1954
        errs := make([]error, 0)
7✔
1955
        // get all pods for this skyhook packages
7✔
1956
        pods, err := r.dal.GetPods(ctx,
7✔
1957
                client.MatchingLabels{
7✔
1958
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
7✔
1959
                },
7✔
1960
        )
7✔
1961
        if err != nil {
7✔
1962
                return false, fmt.Errorf("error getting pods while validating packages: %w", err)
×
1963
        }
×
1964
        if pods == nil || len(pods.Items) == 0 {
14✔
1965
                return false, nil // nothing running for this skyhook on this node
7✔
1966
        }
7✔
1967

1968
        // Initialize metrics for each stage
1969
        stages := make(map[string]map[string]map[v1alpha1.Stage]int)
7✔
1970

7✔
1971
        // group pods by node
7✔
1972
        podsbyNode := make(map[string][]corev1.Pod)
7✔
1973
        for _, pod := range pods.Items {
14✔
1974
                podsbyNode[pod.Spec.NodeName] = append(podsbyNode[pod.Spec.NodeName], pod)
7✔
1975
        }
7✔
1976

1977
        for _, node := range skyhook.GetNodes() {
14✔
1978
                nodeState, err := node.State()
7✔
1979
                if err != nil {
7✔
1980
                        return false, fmt.Errorf("error getting node state: %w", err)
×
1981
                }
×
1982

1983
                for _, pod := range podsbyNode[node.GetNode().Name] {
14✔
1984
                        found := false
7✔
1985

7✔
1986
                        runningPackage, err := GetPackage(&pod)
7✔
1987
                        if err != nil {
7✔
1988
                                errs = append(errs, fmt.Errorf("error getting package from pod [%s:%s] while validating packages: %w", pod.Namespace, pod.Name, err))
×
1989
                        }
×
1990

1991
                        // check if the package is part of the skyhook spec, if not we need to delete it
1992
                        for _, v := range skyhook.GetSkyhook().Spec.Packages {
14✔
1993
                                if podMatchesPackage(r.opts, &v, pod, skyhook.GetSkyhook(), runningPackage.Stage) {
14✔
1994
                                        found = true
7✔
1995
                                }
7✔
1996
                        }
1997

1998
                        // Increment the stage count for metrics
1999
                        if _, ok := stages[runningPackage.Name]; !ok {
14✔
2000
                                stages[runningPackage.Name] = make(map[string]map[v1alpha1.Stage]int)
7✔
2001
                                if _, ok := stages[runningPackage.Name][runningPackage.Version]; !ok {
14✔
2002
                                        stages[runningPackage.Name][runningPackage.Version] = make(map[v1alpha1.Stage]int)
7✔
2003
                                        for _, stage := range v1alpha1.Stages {
14✔
2004
                                                stages[runningPackage.Name][runningPackage.Version][stage] = 0
7✔
2005
                                        }
7✔
2006
                                }
2007
                        }
2008
                        stages[runningPackage.Name][runningPackage.Version][runningPackage.Stage]++
7✔
2009

7✔
2010
                        // uninstall is by definition not part of the skyhook spec, so we cant delete it (because it used to be but was removed, hence uninstalling it)
7✔
2011
                        if runningPackage.Stage == v1alpha1.StageUninstall {
12✔
2012
                                found = true
5✔
2013
                        }
5✔
2014

2015
                        if !found {
12✔
2016
                                update = true
5✔
2017

5✔
2018
                                err := r.InvalidPackage(ctx, &pod)
5✔
2019
                                if err != nil {
9✔
2020
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
4✔
2021
                                }
4✔
2022
                                continue
5✔
2023
                        }
2024

2025
                        // Check if package exists in node state, ie a package running that the node state doesn't know about
2026
                        // something that is often done to try to fix bad node state is to clear the node state completely
2027
                        // which if a package is running, we want to terminate it gracefully. Ofthen what leads to this is
2028
                        // the package is in a crashloop and the operator want to restart it the whole package.
2029
                        // when we apply a package it just check if there is a running package on the node for the state of the package
2030
                        // this can cause to leave a pod running in say config mode, and it there is a depends on you might not correctly
2031
                        // run thins in the correct order.
2032
                        deleteMe := false
7✔
2033
                        packageStatus, exists := nodeState[runningPackage.GetUniqueName()]
7✔
2034
                        if !exists { // package not in node state, so we need to delete it
13✔
2035
                                deleteMe = true
6✔
2036
                        } else { // package in node state, so we need to check if it's running
13✔
2037
                                // need check if the stats match, if not we need to delete it
7✔
2038
                                if packageStatus.Stage != runningPackage.Stage {
13✔
2039
                                        deleteMe = true
6✔
2040
                                }
6✔
2041
                        }
2042

2043
                        if deleteMe {
13✔
2044
                                update = true
6✔
2045
                                err := r.InvalidPackage(ctx, &pod)
6✔
2046
                                if err != nil {
10✔
2047
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
4✔
2048
                                }
4✔
2049
                        }
2050
                }
2051
        }
2052

2053
        return update, utilerrors.NewAggregate(errs)
7✔
2054
}
2055

2056
// InvalidPackage invalidates a package and updates the pod, which will trigger the pod to be deleted
2057
func (r *SkyhookReconciler) InvalidPackage(ctx context.Context, pod *corev1.Pod) error {
6✔
2058
        err := InvalidatePackage(pod)
6✔
2059
        if err != nil {
6✔
2060
                return fmt.Errorf("error invalidating package: %w", err)
×
2061
        }
×
2062

2063
        err = r.Update(ctx, pod)
6✔
2064
        if err != nil {
11✔
2065
                return fmt.Errorf("error updating pod: %w", err)
5✔
2066
        }
5✔
2067

2068
        return nil
6✔
2069
}
2070

2071
// ProcessInterrupt will check and do the interrupt if need, and returns
2072
// false means we are waiting
2073
// true means we are good to proceed
2074
func (r *SkyhookReconciler) ProcessInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, interrupt *v1alpha1.Interrupt, runInterrupt bool) (bool, error) {
7✔
2075

7✔
2076
        if !skyhookNode.HasInterrupt(*_package) {
14✔
2077
                return true, nil
7✔
2078
        }
7✔
2079

2080
        // default starting stage
2081
        stage := v1alpha1.StageApply
5✔
2082
        nextStage := skyhookNode.NextStage(_package)
5✔
2083
        if nextStage != nil {
10✔
2084
                stage = *nextStage
5✔
2085
        }
5✔
2086

2087
        // wait tell this is done if its happening
2088
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
5✔
2089
        if found && status.State == v1alpha1.StateSkipped {
10✔
2090
                return false, nil
5✔
2091
        }
5✔
2092

2093
        // Theres is a race condition when a node reboots and api cleans up the interrupt pod
2094
        // so we need to check if the pod exists and if it does, we need to recreate it
2095
        if status != nil && (status.State == v1alpha1.StateInProgress || status.State == v1alpha1.StateErroring) && status.Stage == v1alpha1.StageInterrupt {
10✔
2096
                // call interrupt to recreate the pod if missing
5✔
2097
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
5✔
2098
                if err != nil {
5✔
2099
                        return false, err
×
2100
                }
×
2101
        }
2102

2103
        // drain and cordon node before applying package that has an interrupt
2104
        if stage == v1alpha1.StageApply {
10✔
2105
                ready, err := r.EnsureNodeIsReadyForInterrupt(ctx, skyhookNode, _package)
5✔
2106
                if err != nil {
5✔
2107
                        return false, err
×
2108
                }
×
2109

2110
                if !ready {
10✔
2111
                        return false, nil
5✔
2112
                }
5✔
2113
        }
2114

2115
        // time to interrupt (once other packages have finished)
2116
        if stage == v1alpha1.StageInterrupt && runInterrupt {
10✔
2117
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
5✔
2118
                if err != nil {
5✔
2119
                        return false, err
×
2120
                }
×
2121

2122
                return false, nil
5✔
2123
        }
2124

2125
        //skipping
2126
        if stage == v1alpha1.StageInterrupt && !runInterrupt {
10✔
2127
                err := skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, stage, 0, _package.ContainerSHA)
5✔
2128
                if err != nil {
5✔
2129
                        return false, fmt.Errorf("error upserting to skip interrupt: %w", err)
×
2130
                }
×
2131
                return false, nil
5✔
2132
        }
2133

2134
        // wait tell this is done if its happening
2135
        if status != nil && status.Stage == v1alpha1.StageInterrupt && status.State != v1alpha1.StateComplete {
10✔
2136
                return false, nil
5✔
2137
        }
5✔
2138

2139
        return true, nil
5✔
2140
}
2141

2142
func (r *SkyhookReconciler) EnsureNodeIsReadyForInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
5✔
2143
        // cordon node
5✔
2144
        skyhookNode.Cordon()
5✔
2145

5✔
2146
        hasWork, err := r.HasNonInterruptWork(ctx, skyhookNode)
5✔
2147
        if err != nil {
5✔
2148
                return false, err
×
2149
        }
×
2150
        if hasWork { // keep waiting...
10✔
2151
                return false, nil
5✔
2152
        }
5✔
2153

2154
        ready, err := r.DrainNode(ctx, skyhookNode, _package)
5✔
2155
        if err != nil {
5✔
2156
                return false, fmt.Errorf("error draining node [%s]: %w", skyhookNode.GetNode().Name, err)
×
2157
        }
×
2158

2159
        return ready, nil
5✔
2160
}
2161

2162
// ApplyPackage starts a pod on node for the package
2163
func (r *SkyhookReconciler) ApplyPackage(ctx context.Context, logger logr.Logger, clusterState *clusterState, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, runInterrupt bool) error {
7✔
2164

7✔
2165
        if _package == nil {
7✔
2166
                return errors.New("can not apply nil package")
×
2167
        }
×
2168

2169
        // default starting stage
2170
        stage := v1alpha1.StageApply
7✔
2171

7✔
2172
        // These modes don't have anything that comes before them so we must specify them as the
7✔
2173
        // starting point. The next stage function will return nil until these modes complete.
7✔
2174
        // Config is a special case as sometimes apply will come before it and other times it wont
7✔
2175
        // which is why it needs to be here as well
7✔
2176
        if packageStatus, found := skyhookNode.PackageStatus(_package.GetUniqueName()); found {
14✔
2177
                switch packageStatus.Stage {
7✔
2178
                case v1alpha1.StageConfig, v1alpha1.StageUpgrade, v1alpha1.StageUninstall:
7✔
2179
                        stage = packageStatus.Stage
7✔
2180
                }
2181
        }
2182

2183
        // if stage != v1alpha1.StageApply {
2184
        //         // If a node gets rest by a user, the about method will return the wrong node state. Above sources it from the skyhook status.
2185
        //         // check if the node has nothing, reset it then apply the package.
2186
        //         nodeState, err := skyhookNode.State()
2187
        //         if err != nil {
2188
        //                 return fmt.Errorf("error getting node state: %w", err)
2189
        //         }
2190

2191
        //         _, found := nodeState[_package.GetUniqueName()]
2192
        //         if !found {
2193
        //                 stage = v1alpha1.StageApply
2194
        //         }
2195
        // }
2196

2197
        nextStage := skyhookNode.NextStage(_package)
7✔
2198
        if nextStage != nil {
14✔
2199
                stage = *nextStage
7✔
2200
        }
7✔
2201

2202
        // test if pod exists, if so, bailout
2203
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
7✔
2204
        if err != nil {
7✔
2205
                return err
×
2206
        }
×
2207

2208
        // wait tell this is done if its happening
2209
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
7✔
2210

7✔
2211
        if found && status.State == v1alpha1.StateSkipped { // skipped, so nothing to do
7✔
2212
                return nil
×
2213
        }
×
2214

2215
        if found && status.State == v1alpha1.StateInProgress { // running, so do nothing atm
14✔
2216
                if exists {
14✔
2217
                        return nil
7✔
2218
                }
7✔
2219
        }
2220

2221
        if exists {
14✔
2222
                // nothing to do here, already running
7✔
2223
                return nil
7✔
2224
        }
7✔
2225

2226
        pod := createPodFromPackage(r.opts, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name, stage)
7✔
2227

7✔
2228
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, stage, _package); err != nil {
7✔
2229
                return fmt.Errorf("error setting package on pod: %w", err)
×
2230
        }
×
2231

2232
        // setup ownership of the pod we created
2233
        // helps run time know what to do when something happens to this pod we are about to create
2234
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
7✔
2235
                return fmt.Errorf("error setting ownership: %w", err)
×
2236
        }
×
2237

2238
        if err := r.Create(ctx, pod); err != nil {
7✔
2239
                return fmt.Errorf("error creating pod: %w", err)
×
2240
        }
×
2241

2242
        if err = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, stage, 0, _package.ContainerSHA); err != nil {
7✔
2243
                err = fmt.Errorf("error upserting package: %w", err) // want to keep going in this case, but don't want to lose the err
×
2244
        }
×
2245

2246
        skyhookNode.SetStatus(v1alpha1.StatusInProgress)
7✔
2247

7✔
2248
        skyhookNode.GetSkyhook().AddCondition(metav1.Condition{
7✔
2249
                Type:               fmt.Sprintf("%s/ApplyPackage", v1alpha1.METADATA_PREFIX),
7✔
2250
                Status:             metav1.ConditionTrue,
7✔
2251
                ObservedGeneration: skyhookNode.GetSkyhook().Generation,
7✔
2252
                LastTransitionTime: metav1.Now(),
7✔
2253
                Reason:             "ApplyPackage",
7✔
2254
                Message:            fmt.Sprintf("Applying package [%s:%s] to node [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name),
7✔
2255
        })
7✔
2256

7✔
2257
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] from [skyhook:%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetSkyhook().Name, stage)
7✔
2258
        r.recorder.Eventf(skyhookNode.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] to node [%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name, stage)
7✔
2259

7✔
2260
        skyhookNode.GetSkyhook().Updated = true
7✔
2261

7✔
2262
        return err
7✔
2263
}
2264

2265
// HandleRuntimeRequired finds any nodes for which all runtime required Skyhooks are complete and remove their runtime required taint
2266
// Will return an error if the patching of the nodes is not possible
2267
func (r *SkyhookReconciler) HandleRuntimeRequired(ctx context.Context, clusterState *clusterState) error {
7✔
2268
        node_to_skyhooks, skyhook_node_map := groupSkyhooksByNode(clusterState)
7✔
2269
        to_remove := getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks, skyhook_node_map)
7✔
2270
        // Remove the runtime required taint from nodes in to_remove
7✔
2271
        taint_to_remove := r.opts.GetRuntimeRequiredTaint()
7✔
2272
        errs := make([]error, 0)
7✔
2273
        for _, node := range to_remove {
12✔
2274
                // check before removing taint that it even exists to begin with
5✔
2275
                if !taints.TaintExists(node.Spec.Taints, &taint_to_remove) {
10✔
2276
                        continue
5✔
2277
                }
2278
                // RemoveTaint will ALWAYS return nil for its error so no need to check it
2279
                new_node, updated, _ := taints.RemoveTaint(node, &taint_to_remove)
5✔
2280
                if updated {
10✔
2281
                        err := r.Patch(ctx, new_node, client.MergeFrom(node))
5✔
2282
                        if err != nil {
5✔
2283
                                errs = append(errs, err)
×
2284
                        }
×
2285
                }
2286
        }
2287
        if len(errs) > 0 {
7✔
2288
                return utilerrors.NewAggregate(errs)
×
2289
        }
×
2290
        return nil
7✔
2291
}
2292

2293
// Group Skyhooks by what node they target
2294
func groupSkyhooksByNode(clusterState *clusterState) (map[types.UID][]SkyhookNodes, map[types.UID]*corev1.Node) {
8✔
2295
        node_to_skyhooks := make(map[types.UID][]SkyhookNodes)
8✔
2296
        nodes := make(map[types.UID]*corev1.Node)
8✔
2297
        for _, skyhook := range clusterState.skyhooks {
16✔
2298
                // Ignore skyhooks that don't have runtime required
8✔
2299
                if !skyhook.GetSkyhook().Spec.RuntimeRequired {
16✔
2300
                        continue
8✔
2301
                }
2302
                for _, node := range skyhook.GetNodes() {
12✔
2303
                        if _, ok := node_to_skyhooks[node.GetNode().UID]; !ok {
12✔
2304
                                node_to_skyhooks[node.GetNode().UID] = make([]SkyhookNodes, 0)
6✔
2305
                                nodes[node.GetNode().UID] = node.GetNode()
6✔
2306
                        }
6✔
2307
                        node_to_skyhooks[node.GetNode().UID] = append(node_to_skyhooks[node.GetNode().UID], skyhook)
6✔
2308
                }
2309

2310
        }
2311
        return node_to_skyhooks, nodes
8✔
2312
}
2313

2314
// Get the nodes to remove runtime required taint from node that all skyhooks targeting that node have completed
2315
// Note: This checks per-node completion, not skyhook-level completion. A node's taint is removed when all
2316
// runtime-required skyhooks are complete ON THAT SPECIFIC NODE, regardless of other nodes' completion status.
2317
func getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks map[types.UID][]SkyhookNodes, nodes map[types.UID]*corev1.Node) []*corev1.Node {
8✔
2318
        to_remove := make([]*corev1.Node, 0)
8✔
2319
        for node_uid, skyhooks := range node_to_skyhooks {
14✔
2320
                node := nodes[node_uid]
6✔
2321
                all_complete := true
6✔
2322
                for _, skyhook := range skyhooks {
12✔
2323
                        // Check if THIS specific node is complete for this skyhook (not all nodes)
6✔
2324
                        _, nodeWrapper := skyhook.GetNode(node.Name)
6✔
2325
                        if nodeWrapper == nil || !nodeWrapper.IsComplete() {
12✔
2326
                                all_complete = false
6✔
2327
                                break
6✔
2328
                        }
2329
                }
2330
                if all_complete {
12✔
2331
                        to_remove = append(to_remove, node)
6✔
2332
                }
6✔
2333
        }
2334
        return to_remove
8✔
2335
}
2336

2337
// setPodResources sets resources for all containers and init containers in the pod if override is set, else leaves empty for LimitRange
2338
func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) {
8✔
2339
        if res == nil {
16✔
2340
                return
8✔
2341
        }
8✔
2342
        if !res.CPURequest.IsZero() || !res.CPULimit.IsZero() || !res.MemoryRequest.IsZero() || !res.MemoryLimit.IsZero() {
12✔
2343
                for i := range pod.Spec.InitContainers {
12✔
2344
                        pod.Spec.InitContainers[i].Resources = corev1.ResourceRequirements{
6✔
2345
                                Limits: corev1.ResourceList{
6✔
2346
                                        corev1.ResourceCPU:    res.CPULimit,
6✔
2347
                                        corev1.ResourceMemory: res.MemoryLimit,
6✔
2348
                                },
6✔
2349
                                Requests: corev1.ResourceList{
6✔
2350
                                        corev1.ResourceCPU:    res.CPURequest,
6✔
2351
                                        corev1.ResourceMemory: res.MemoryRequest,
6✔
2352
                                },
6✔
2353
                        }
6✔
2354
                }
6✔
2355
        }
2356
}
2357

2358
// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies.
2359
func partitionNodesIntoCompartments(clusterState *clusterState) error {
8✔
2360
        for _, skyhook := range clusterState.skyhooks {
16✔
2361
                // Skip skyhooks without a deployment policy (they use the default compartment created in BuildState)
8✔
2362
                if skyhook.GetSkyhook().Spec.DeploymentPolicy == "" {
16✔
2363
                        continue
8✔
2364
                }
2365

2366
                // Skip if no compartments exist (e.g., deployment policy not found)
2367
                // The webhook should prevent this at admission time, and the controller sets a condition at runtime,
2368
                // but we guard here to prevent panics if the policy goes missing
2369
                if len(skyhook.GetCompartments()) == 0 {
3✔
2370
                        continue
1✔
2371
                }
2372

2373
                // Clear all compartments before reassigning nodes to prevent stale nodes
2374
                // This ensures nodes are only in their current compartment based on current labels
2375
                for _, compartment := range skyhook.GetCompartments() {
4✔
2376
                        compartment.ClearNodes()
2✔
2377
                }
2✔
2378

2379
                for _, node := range skyhook.GetNodes() {
4✔
2380
                        compartmentName, err := skyhook.AssignNodeToCompartment(node)
2✔
2381
                        if err != nil {
2✔
2382
                                return fmt.Errorf("error assigning node %s: %w", node.GetNode().Name, err)
×
2383
                        }
×
2384
                        if err := skyhook.AddCompartmentNode(compartmentName, node); err != nil {
2✔
2385
                                return fmt.Errorf("error adding node %s to compartment %s: %w", node.GetNode().Name, compartmentName, err)
×
2386
                        }
×
2387
                }
2388
        }
2389

2390
        return nil
8✔
2391
}
2392

2393
// validateAndUpsertSkyhookData performs validation and configmap operations for a skyhook
2394
func (r *SkyhookReconciler) validateAndUpsertSkyhookData(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, ctrl.Result, error) {
7✔
2395
        if yes, result, err := shouldReturn(r.ValidateRunningPackages(ctx, skyhook)); yes {
13✔
2396
                return yes, result, err
6✔
2397
        }
6✔
2398

2399
        if yes, result, err := shouldReturn(r.ValidateNodeConfigmaps(ctx, skyhook.GetSkyhook().Name, skyhook.GetNodes())); yes {
12✔
2400
                return yes, result, err
5✔
2401
        }
5✔
2402

2403
        if yes, result, err := shouldReturn(r.UpsertConfigmaps(ctx, skyhook, clusterState)); yes {
12✔
2404
                return yes, result, err
5✔
2405
        }
5✔
2406

2407
        return false, ctrl.Result{}, nil
7✔
2408
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc