• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NVIDIA / skyhook / 20252183768

16 Dec 2025 12:27AM UTC coverage: 74.919% (-0.04%) from 74.963%
20252183768

push

github

web-flow
feat: add support for ignoring nodes via label (#128)

25 of 36 new or added lines in 1 file covered. (69.44%)

1 existing line in 1 file now uncovered.

4612 of 6156 relevant lines covered (74.92%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.3
/operator/internal/controller/skyhook_controller.go
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
 * SPDX-License-Identifier: Apache-2.0
4
 *
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
package controller
20

21
import (
22
        "cmp"
23
        "context"
24
        "crypto/sha256"
25
        "encoding/hex"
26
        "encoding/json"
27
        "errors"
28
        "fmt"
29
        "reflect"
30
        "slices"
31
        "sort"
32
        "strings"
33
        "time"
34

35
        "github.com/NVIDIA/skyhook/operator/api/v1alpha1"
36
        "github.com/NVIDIA/skyhook/operator/internal/dal"
37
        "github.com/NVIDIA/skyhook/operator/internal/version"
38
        "github.com/NVIDIA/skyhook/operator/internal/wrapper"
39
        "github.com/go-logr/logr"
40

41
        corev1 "k8s.io/api/core/v1"
42
        policyv1 "k8s.io/api/policy/v1"
43
        apierrors "k8s.io/apimachinery/pkg/api/errors"
44
        "k8s.io/apimachinery/pkg/api/resource"
45
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46
        "k8s.io/apimachinery/pkg/runtime"
47
        "k8s.io/apimachinery/pkg/types"
48
        utilerrors "k8s.io/apimachinery/pkg/util/errors"
49
        "k8s.io/client-go/tools/record"
50
        "k8s.io/kubernetes/pkg/util/taints"
51
        ctrl "sigs.k8s.io/controller-runtime"
52
        "sigs.k8s.io/controller-runtime/pkg/client"
53
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/log"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
)
58

59
const (
60
        EventsReasonSkyhookApply       = "Apply"
61
        EventsReasonSkyhookInterrupt   = "Interrupt"
62
        EventsReasonSkyhookDrain       = "Drain"
63
        EventsReasonSkyhookStateChange = "State"
64
        EventsReasonNodeReboot         = "Reboot"
65
        EventTypeNormal                = "Normal"
66
        // EventTypeWarning = "Warning"
67
        TaintUnschedulable     = corev1.TaintNodeUnschedulable
68
        InterruptContainerName = "interrupt"
69

70
        SkyhookFinalizer = "skyhook.nvidia.com/skyhook"
71
)
72

73
type SkyhookOperatorOptions struct {
74
        Namespace            string        `env:"NAMESPACE, default=skyhook"`
75
        MaxInterval          time.Duration `env:"DEFAULT_INTERVAL, default=10m"`
76
        ImagePullSecret      string        `env:"IMAGE_PULL_SECRET, default=node-init-secret"` //TODO: should this be defaulted?
77
        CopyDirRoot          string        `env:"COPY_DIR_ROOT, default=/var/lib/skyhook"`
78
        ReapplyOnReboot      bool          `env:"REAPPLY_ON_REBOOT, default=false"`
79
        RuntimeRequiredTaint string        `env:"RUNTIME_REQUIRED_TAINT, default=skyhook.nvidia.com=runtime-required:NoSchedule"`
80
        PauseImage           string        `env:"PAUSE_IMAGE, default=registry.k8s.io/pause:3.10"`
81
        AgentImage           string        `env:"AGENT_IMAGE, default=ghcr.io/nvidia/skyhook/agent:latest"` // TODO: this needs to be updated with a working default
82
        AgentLogRoot         string        `env:"AGENT_LOG_ROOT, default=/var/log/skyhook"`
83
}
84

85
func (o *SkyhookOperatorOptions) Validate() error {
1✔
86

1✔
87
        messages := make([]string, 0)
1✔
88
        if o.Namespace == "" {
1✔
89
                messages = append(messages, "namespace must be set")
×
90
        }
×
91
        if o.CopyDirRoot == "" {
1✔
92
                messages = append(messages, "copy dir root must be set")
×
93
        }
×
94
        if o.RuntimeRequiredTaint == "" {
1✔
95
                messages = append(messages, "runtime required taint must be set")
×
96
        }
×
97
        if o.MaxInterval < time.Minute {
2✔
98
                messages = append(messages, "max interval must be at least 1 minute")
1✔
99
        }
1✔
100

101
        // CopyDirRoot must start with /
102
        if !strings.HasPrefix(o.CopyDirRoot, "/") {
2✔
103
                messages = append(messages, "copy dir root must start with /")
1✔
104
        }
1✔
105

106
        // RuntimeRequiredTaint must be parsable and must not be a deletion
107
        _, delete, err := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
1✔
108
        if err != nil {
2✔
109
                messages = append(messages, fmt.Sprintf("runtime required taint is invalid: %s", err.Error()))
1✔
110
        }
1✔
111
        if len(delete) > 0 {
2✔
112
                messages = append(messages, "runtime required taint must not be a deletion")
1✔
113
        }
1✔
114

115
        if o.AgentImage == "" {
2✔
116
                messages = append(messages, "agent image must be set")
1✔
117
        }
1✔
118

119
        if !strings.Contains(o.AgentImage, ":") {
2✔
120
                messages = append(messages, "agent image must contain a tag")
1✔
121
        }
1✔
122

123
        if o.PauseImage == "" {
2✔
124
                messages = append(messages, "pause image must be set")
1✔
125
        }
1✔
126

127
        if !strings.Contains(o.PauseImage, ":") {
2✔
128
                messages = append(messages, "pause image must contain a tag")
1✔
129
        }
1✔
130

131
        if len(messages) > 0 {
2✔
132
                return errors.New(strings.Join(messages, ", "))
1✔
133
        }
1✔
134

135
        return nil
1✔
136
}
137

138
// AgentVersion returns the image tag portion of AgentImage
139
func (o *SkyhookOperatorOptions) AgentVersion() string {
1✔
140
        parts := strings.Split(o.AgentImage, ":")
1✔
141
        return parts[len(parts)-1]
1✔
142
}
1✔
143

144
func (o *SkyhookOperatorOptions) GetRuntimeRequiredTaint() corev1.Taint {
1✔
145
        to_add, _, _ := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
1✔
146
        return to_add[0]
1✔
147
}
1✔
148

149
func (o *SkyhookOperatorOptions) GetRuntimeRequiredToleration() corev1.Toleration {
1✔
150
        taint := o.GetRuntimeRequiredTaint()
1✔
151
        return corev1.Toleration{
1✔
152
                Key:      taint.Key,
1✔
153
                Operator: corev1.TolerationOpEqual,
1✔
154
                Value:    taint.Value,
1✔
155
                Effect:   taint.Effect,
1✔
156
        }
1✔
157
}
1✔
158

159
// force type checking against this interface
160
var _ reconcile.Reconciler = &SkyhookReconciler{}
161

162
func NewSkyhookReconciler(schema *runtime.Scheme, c client.Client, recorder record.EventRecorder, opts SkyhookOperatorOptions) (*SkyhookReconciler, error) {
1✔
163

1✔
164
        err := opts.Validate()
1✔
165
        if err != nil {
1✔
166
                return nil, fmt.Errorf("invalid skyhook operator options: %w", err)
×
167
        }
×
168

169
        return &SkyhookReconciler{
1✔
170
                Client:   c,
1✔
171
                scheme:   schema,
1✔
172
                recorder: recorder,
1✔
173
                opts:     opts,
1✔
174
                dal:      dal.New(c),
1✔
175
        }, nil
1✔
176
}
177

178
// SkyhookReconciler reconciles a Skyhook object
179
type SkyhookReconciler struct {
180
        client.Client
181
        scheme   *runtime.Scheme
182
        recorder record.EventRecorder
183
        opts     SkyhookOperatorOptions
184
        dal      dal.DAL
185
}
186

187
// SetupWithManager sets up the controller with the Manager.
188
func (r *SkyhookReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
189

1✔
190
        // indexes allow for query on fields to use the local cache
1✔
191
        indexer := mgr.GetFieldIndexer()
1✔
192
        err := indexer.
1✔
193
                IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
2✔
194
                        pod, ok := o.(*corev1.Pod)
1✔
195
                        if !ok {
1✔
196
                                return nil
×
197
                        }
×
198
                        return []string{pod.Spec.NodeName}
1✔
199
                })
200

201
        if err != nil {
1✔
202
                return err
×
203
        }
×
204

205
        ehandler := &eventHandler{
1✔
206
                logger: mgr.GetLogger(),
1✔
207
                dal:    dal.New(r.Client),
1✔
208
        }
1✔
209

1✔
210
        return ctrl.NewControllerManagedBy(mgr).
1✔
211
                For(&v1alpha1.Skyhook{}).
1✔
212
                Watches(
1✔
213
                        &corev1.Pod{},
1✔
214
                        handler.EnqueueRequestsFromMapFunc(podHandlerFunc),
1✔
215
                ).
1✔
216
                Watches(
1✔
217
                        &corev1.Node{},
1✔
218
                        ehandler,
1✔
219
                ).
1✔
220
                Complete(r)
1✔
221
}
222

223
// CRD Permissions
224
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks,verbs=get;list;watch;create;update;patch;delete
225
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/status,verbs=get;update;patch
226
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/finalizers,verbs=update
227

228
// core permissions
229
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;update;patch;watch
230
//+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
231
//+kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create
232
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
233
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
234

235
// Reconcile is part of the main kubernetes reconciliation loop which aims to
236
// move the current state of the cluster closer to the desired state.
237
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile
238
func (r *SkyhookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
239
        logger := log.FromContext(ctx)
1✔
240
        // split off requests for pods
1✔
241
        if strings.HasPrefix(req.Name, "pod---") {
2✔
242
                name := strings.Split(req.Name, "pod---")[1]
1✔
243
                pod, err := r.dal.GetPod(ctx, req.Namespace, name)
1✔
244
                if err == nil && pod != nil { // if pod, then call other wise not a pod
2✔
245
                        return r.PodReconcile(ctx, pod)
1✔
246
                }
1✔
247
                return ctrl.Result{}, err
1✔
248
        }
249

250
        // get all skyhooks (SCR)
251
        skyhooks, err := r.dal.GetSkyhooks(ctx)
1✔
252
        if err != nil {
1✔
253
                // error, going to requeue and backoff
×
254
                logger.Error(err, "error getting skyhooks")
×
255
                return ctrl.Result{}, err
×
256
        }
×
257

258
        // if there are no skyhooks, so actually nothing to do, so don't requeue
259
        if skyhooks == nil || len(skyhooks.Items) == 0 {
2✔
260
                return ctrl.Result{}, nil
1✔
261
        }
1✔
262

263
        // get all nodes
264
        nodes, err := r.dal.GetNodes(ctx)
1✔
265
        if err != nil {
1✔
266
                // error, going to requeue and backoff
×
267
                logger.Error(err, "error getting nodes")
×
268
                return ctrl.Result{}, err
×
269
        }
×
270

271
        // if no nodes, well not work to do either
272
        if nodes == nil || len(nodes.Items) == 0 {
1✔
273
                // no nodes, so nothing to do
×
274
                return ctrl.Result{}, nil
×
275
        }
×
276

277
        // get all deployment policies
278
        deploymentPolicies, err := r.dal.GetDeploymentPolicies(ctx)
1✔
279
        if err != nil {
1✔
280
                logger.Error(err, "error getting deployment policies")
×
281
                return ctrl.Result{}, err
×
282
        }
×
283

284
        // TODO: this build state could error in a lot of ways, and I think we might want to move towards partial state
285
        // mean if we cant get on SCR state, great, process that one and error
286

287
        // BUILD cluster state from all skyhooks, and all nodes
288
        // this filters and pairs up nodes to skyhooks, also provides help methods for introspection and mutation
289
        clusterState, err := BuildState(skyhooks, nodes, deploymentPolicies)
1✔
290
        if err != nil {
1✔
291
                // error, going to requeue and backoff
×
292
                logger.Error(err, "error building cluster state")
×
293
                return ctrl.Result{}, err
×
294
        }
×
295

296
        // PARTITION nodes into compartments for each skyhook that uses deployment policies
297
        err = partitionNodesIntoCompartments(clusterState)
1✔
298
        if err != nil {
1✔
299
                logger.Error(err, "error partitioning nodes into compartments")
×
300
                return ctrl.Result{}, err
×
301
        }
×
302

303
        if yes, result, err := shouldReturn(r.HandleMigrations(ctx, clusterState)); yes {
2✔
304
                return result, err
1✔
305
        }
1✔
306

307
        if yes, result, err := shouldReturn(r.TrackReboots(ctx, clusterState)); yes {
2✔
308
                return result, err
1✔
309
        }
1✔
310

311
        // node picker is for selecting nodes to do work, tries maintain a prior of nodes between SCRs
312
        nodePicker := NewNodePicker(r.opts.GetRuntimeRequiredToleration())
1✔
313

1✔
314
        errs := make([]error, 0)
1✔
315
        var result *ctrl.Result
1✔
316

1✔
317
        for _, skyhook := range clusterState.skyhooks {
2✔
318
                if yes, result, err := shouldReturn(r.HandleFinalizer(ctx, skyhook)); yes {
2✔
319
                        return result, err
1✔
320
                }
1✔
321

322
                if yes, result, err := shouldReturn(r.ReportState(ctx, clusterState, skyhook)); yes {
2✔
323
                        return result, err
1✔
324
                }
1✔
325

326
                if skyhook.IsPaused() {
2✔
327
                        if yes, result, err := shouldReturn(r.UpdatePauseStatus(ctx, clusterState, skyhook)); yes {
2✔
328
                                return result, err
1✔
329
                        }
1✔
330
                        continue
1✔
331
                }
332

333
                if yes, result, err := r.validateAndUpsertSkyhookData(ctx, skyhook, clusterState); yes {
2✔
334
                        return result, err
1✔
335
                }
1✔
336

337
                changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
1✔
338
                if changed {
2✔
339
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
1✔
340
                        if len(errs) > 0 {
2✔
341
                                return ctrl.Result{RequeueAfter: time.Second * 2}, utilerrors.NewAggregate(errs)
1✔
342
                        }
1✔
343
                        return ctrl.Result{RequeueAfter: time.Second * 2}, nil
1✔
344
                }
345

346
                _, err := HandleVersionChange(skyhook)
1✔
347
                if err != nil {
1✔
348
                        return ctrl.Result{RequeueAfter: time.Second * 2}, fmt.Errorf("error getting packages to uninstall: %w", err)
×
349
                }
×
350
        }
351

352
        skyhook := GetNextSkyhook(clusterState.skyhooks)
1✔
353
        if skyhook != nil && !skyhook.IsPaused() {
2✔
354

1✔
355
                result, err = r.RunSkyhookPackages(ctx, clusterState, nodePicker, skyhook)
1✔
356
                if err != nil {
2✔
357
                        logger.Error(err, "error processing skyhook", "skyhook", skyhook.GetSkyhook().Name)
1✔
358
                        errs = append(errs, err)
1✔
359
                }
1✔
360
        }
361

362
        err = r.HandleRuntimeRequired(ctx, clusterState)
1✔
363
        if err != nil {
1✔
364
                errs = append(errs, err)
×
365
        }
×
366

367
        if len(errs) > 0 {
2✔
368
                err := utilerrors.NewAggregate(errs)
1✔
369
                return ctrl.Result{}, err
1✔
370
        }
1✔
371

372
        if result != nil {
2✔
373
                return *result, nil
1✔
374
        }
1✔
375

376
        // default happy retry after max
377
        return ctrl.Result{RequeueAfter: r.opts.MaxInterval}, nil
1✔
378
}
379

380
func shouldReturn(updates bool, err error) (bool, ctrl.Result, error) {
1✔
381
        if err != nil {
2✔
382
                return true, ctrl.Result{}, err
1✔
383
        }
1✔
384
        if updates {
2✔
385
                return true, ctrl.Result{RequeueAfter: time.Second * 2}, nil
1✔
386
        }
1✔
387
        return false, ctrl.Result{}, nil
1✔
388
}
389

390
func (r *SkyhookReconciler) HandleMigrations(ctx context.Context, clusterState *clusterState) (bool, error) {
1✔
391

1✔
392
        updates := false
1✔
393

1✔
394
        if version.VERSION == "" {
1✔
395
                // this means the binary was complied without version information
×
396
                return false, nil
×
397
        }
×
398

399
        logger := log.FromContext(ctx)
1✔
400
        errors := make([]error, 0)
1✔
401
        for _, skyhook := range clusterState.skyhooks {
2✔
402

1✔
403
                err := skyhook.Migrate(logger)
1✔
404
                if err != nil {
1✔
405
                        return false, fmt.Errorf("error migrating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
406
                }
×
407

408
                if err := skyhook.GetSkyhook().Skyhook.Validate(); err != nil {
1✔
409
                        return false, fmt.Errorf("error validating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
410
                }
×
411

412
                for _, node := range skyhook.GetNodes() {
2✔
413
                        if node.Changed() {
2✔
414
                                err := r.Status().Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
1✔
415
                                if err != nil {
1✔
416
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
417
                                }
×
418

419
                                err = r.Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
1✔
420
                                if err != nil {
1✔
421
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
422
                                }
×
423
                                updates = true
1✔
424
                        }
425
                }
426

427
                if skyhook.GetSkyhook().Updated {
2✔
428
                        // need to do this because SaveNodesAndSkyhook only saves skyhook status, not the main skyhook object where the annotations are
1✔
429
                        // additionally it needs to be an update, a patch nils out the annotations for some reason, which the save function does a patch
1✔
430

1✔
431
                        if err = r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
2✔
432
                                return false, fmt.Errorf("error updating during migration skyhook status [%s]: %w", skyhook.GetSkyhook().Name, err)
1✔
433
                        }
1✔
434

435
                        // because of conflict issues (409) we need to do things a bit differently here.
436
                        // We might be able to use server side apply in the future, but for now we need to do this
437
                        // https://kubernetes.io/docs/reference/using-api/server-side-apply/
438
                        // https://github.com/kubernetes-sigs/controller-runtime/issues/347
439

440
                        // work around for now is to grab a new copy of the object, and then patch it
441

442
                        newskyhook, err := r.dal.GetSkyhook(ctx, skyhook.GetSkyhook().Name)
1✔
443
                        if err != nil {
1✔
444
                                return false, fmt.Errorf("error getting skyhook to migrate [%s]: %w", skyhook.GetSkyhook().Name, err)
×
445
                        }
×
446
                        newPatch := client.MergeFrom(newskyhook.DeepCopy())
1✔
447

1✔
448
                        // set version
1✔
449
                        wrapper.NewSkyhookWrapper(newskyhook).SetVersion()
1✔
450

1✔
451
                        if err = r.Patch(ctx, newskyhook, newPatch); err != nil {
1✔
452
                                return false, fmt.Errorf("error updating during migration skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
453
                        }
×
454

455
                        updates = true
1✔
456
                }
457
        }
458

459
        if len(errors) > 0 {
1✔
460
                return false, utilerrors.NewAggregate(errors)
×
461
        }
×
462

463
        return updates, nil
1✔
464
}
465

466
// ReportState computes and puts important information into the skyhook status so that monitoring tools such as k9s
467
// can see the information at a glance. For example, the number of completed nodes and the list of packages in the skyhook.
468
func (r *SkyhookReconciler) ReportState(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
1✔
469

1✔
470
        // save updated state to skyhook status
1✔
471
        skyhook.ReportState()
1✔
472

1✔
473
        if skyhook.GetSkyhook().Updated {
2✔
474
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
1✔
475
                if len(errs) > 0 {
1✔
476
                        return false, utilerrors.NewAggregate(errs)
×
477
                }
×
478
                return true, nil
1✔
479
        }
480

481
        return false, nil
1✔
482
}
483

484
func (r *SkyhookReconciler) UpdatePauseStatus(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
1✔
485
        changed := UpdateSkyhookPauseStatus(skyhook)
1✔
486

1✔
487
        if changed {
2✔
488
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
1✔
489
                if len(errs) > 0 {
1✔
490
                        return false, utilerrors.NewAggregate(errs)
×
491
                }
×
492
                return true, nil
1✔
493
        }
494

495
        return false, nil
1✔
496
}
497

498
func (r *SkyhookReconciler) TrackReboots(ctx context.Context, clusterState *clusterState) (bool, error) {
1✔
499

1✔
500
        updates := false
1✔
501
        errs := make([]error, 0)
1✔
502

1✔
503
        for _, skyhook := range clusterState.skyhooks {
2✔
504
                if skyhook.GetSkyhook().Status.NodeBootIds == nil {
2✔
505
                        skyhook.GetSkyhook().Status.NodeBootIds = make(map[string]string)
1✔
506
                }
1✔
507

508
                for _, node := range skyhook.GetNodes() {
2✔
509
                        id, ok := skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name]
1✔
510

1✔
511
                        if !ok { // new node
2✔
512
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
1✔
513
                                skyhook.GetSkyhook().Updated = true
1✔
514
                        }
1✔
515

516
                        if id != "" && id != node.GetNode().Status.NodeInfo.BootID { // node rebooted
1✔
517
                                if r.opts.ReapplyOnReboot {
×
518
                                        r.recorder.Eventf(skyhook.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node [%s] to be reapplied", node.GetNode().Name)
×
519
                                        r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node for [%s] to be reapplied", node.GetSkyhook().Name)
×
520
                                        node.Reset()
×
521
                                }
×
522
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
×
523
                                skyhook.GetSkyhook().Updated = true
×
524
                        }
525

526
                        if node.Changed() { // update
1✔
527
                                updates = true
×
528
                                err := r.Update(ctx, node.GetNode())
×
529
                                if err != nil {
×
530
                                        errs = append(errs, fmt.Errorf("error updating node after reboot [%s]: %w", node.GetNode().Name, err))
×
531
                                }
×
532
                        }
533
                }
534
                if skyhook.GetSkyhook().Updated { // update
2✔
535
                        updates = true
1✔
536
                        err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook)
1✔
537
                        if err != nil {
2✔
538
                                errs = append(errs, fmt.Errorf("error updating skyhook status after reboot [%s]: %w", skyhook.GetSkyhook().Name, err))
1✔
539
                        }
1✔
540
                }
541
        }
542

543
        return updates, utilerrors.NewAggregate(errs)
1✔
544
}
545

546
// RunSkyhookPackages runs all skyhook packages then saves and requeues if changes were made
547
func (r *SkyhookReconciler) RunSkyhookPackages(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, skyhook SkyhookNodes) (*ctrl.Result, error) {
1✔
548

1✔
549
        logger := log.FromContext(ctx)
1✔
550
        requeue := false
1✔
551

1✔
552
        toUninstall, err := HandleVersionChange(skyhook)
1✔
553
        if err != nil {
1✔
554
                return nil, fmt.Errorf("error getting packages to uninstall: %w", err)
×
555
        }
×
556

557
        changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
1✔
558
        if !changed && skyhook.IsComplete() {
1✔
559
                return nil, nil
×
560
        }
×
561

562
        selectedNode := nodePicker.SelectNodes(skyhook)
1✔
563

1✔
564
        for _, node := range selectedNode {
2✔
565

1✔
566
                if node.IsComplete() && !node.Changed() {
1✔
567
                        continue
×
568
                }
569

570
                toRun, err := node.RunNext()
1✔
571
                if err != nil {
1✔
572
                        return nil, fmt.Errorf("error getting next packages to run: %w", err)
×
573
                }
×
574

575
                // prepend the uninstall packages so they are ran first
576
                toRun = append(toUninstall, toRun...)
1✔
577

1✔
578
                interrupt, pack := fudgeInterruptWithPriority(toRun, skyhook.GetSkyhook().GetConfigUpdates(), skyhook.GetSkyhook().GetConfigInterrupts())
1✔
579

1✔
580
                for _, f := range toRun {
2✔
581

1✔
582
                        ok, err := r.ProcessInterrupt(ctx, node, f, interrupt, interrupt != nil && f.Name == pack)
1✔
583
                        if err != nil {
1✔
584
                                // TODO: error handle
×
585
                                return nil, fmt.Errorf("error processing if we should interrupt [%s:%s]: %w", f.Name, f.Version, err)
×
586
                        }
×
587
                        if !ok {
2✔
588
                                requeue = true
1✔
589
                                continue
1✔
590
                        }
591

592
                        err = r.ApplyPackage(ctx, logger, clusterState, node, f, interrupt != nil && f.Name == pack)
1✔
593
                        if err != nil {
1✔
594
                                return nil, fmt.Errorf("error applying package [%s:%s]: %w", f.Name, f.Version, err)
×
595
                        }
×
596

597
                        // process one package at a time
598
                        if skyhook.GetSkyhook().Spec.Serial {
1✔
599
                                return &ctrl.Result{Requeue: true}, nil
×
600
                        }
×
601
                }
602
        }
603

604
        saved, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
1✔
605
        if len(errs) > 0 {
2✔
606
                return &ctrl.Result{}, utilerrors.NewAggregate(errs)
1✔
607
        }
1✔
608
        if saved {
2✔
609
                requeue = true
1✔
610
        }
1✔
611

612
        if !skyhook.IsComplete() || requeue {
2✔
613
                return &ctrl.Result{RequeueAfter: time.Second * 2}, nil // not sure this is better then just requeue bool
1✔
614
        }
1✔
615

616
        return nil, utilerrors.NewAggregate(errs)
×
617
}
618

619
// SaveNodesAndSkyhook saves nodes and skyhook and will update the events if the skyhook status changes
620
func (r *SkyhookReconciler) SaveNodesAndSkyhook(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, []error) {
1✔
621
        saved := false
1✔
622
        errs := make([]error, 0)
1✔
623

1✔
624
        for _, node := range skyhook.GetNodes() {
2✔
625
                patch := client.StrategicMergeFrom(clusterState.tracker.GetOriginal(node.GetNode()))
1✔
626
                if node.Changed() {
2✔
627
                        err := r.Patch(ctx, node.GetNode(), patch)
1✔
628
                        if err != nil {
1✔
629
                                errs = append(errs, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
630
                        }
×
631
                        saved = true
1✔
632

1✔
633
                        err = r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhook.GetSkyhook(), node.GetNode())
1✔
634
                        if err != nil {
1✔
635
                                errs = append(errs, fmt.Errorf("error upserting labels, annotations, and packages config map for node [%s]: %w", node.GetNode().Name, err))
×
636
                        }
×
637

638
                        if node.IsComplete() {
2✔
639
                                r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook [%s] complete.", skyhook.GetSkyhook().Name)
1✔
640

1✔
641
                                // since node is complete remove from priority
1✔
642
                                if _, ok := skyhook.GetSkyhook().Status.NodePriority[node.GetNode().Name]; ok {
2✔
643
                                        delete(skyhook.GetSkyhook().Status.NodePriority, node.GetNode().Name)
1✔
644
                                        skyhook.GetSkyhook().Updated = true
1✔
645
                                }
1✔
646
                        }
647
                }
648

649
                // updates node's condition
650
                node.UpdateCondition()
1✔
651
                if node.Changed() {
2✔
652
                        // conditions are in status
1✔
653
                        err := r.Status().Patch(ctx, node.GetNode(), patch)
1✔
654
                        if err != nil {
2✔
655
                                errs = append(errs, fmt.Errorf("error patching node status [%s]: %w", node.GetNode().Name, err))
1✔
656
                        }
1✔
657
                        saved = true
1✔
658
                }
659

660
                if node.GetSkyhook() != nil && node.GetSkyhook().Updated {
2✔
661
                        skyhook.GetSkyhook().Updated = true
1✔
662
                }
1✔
663
        }
664

665
        if skyhook.GetSkyhook().Updated {
2✔
666
                patch := client.MergeFrom(clusterState.tracker.GetOriginal(skyhook.GetSkyhook().Skyhook))
1✔
667
                err := r.Status().Patch(ctx, skyhook.GetSkyhook().Skyhook, patch)
1✔
668
                if err != nil {
1✔
669
                        errs = append(errs, err)
×
670
                }
×
671
                saved = true
1✔
672

1✔
673
                if skyhook.GetPriorStatus() != "" && skyhook.GetPriorStatus() != skyhook.Status() {
2✔
674
                        // we transitioned, fire event
1✔
675
                        r.recorder.Eventf(skyhook.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook transitioned [%s] -> [%s]", skyhook.GetPriorStatus(), skyhook.Status())
1✔
676
                }
1✔
677
        }
678

679
        if len(errs) > 0 {
2✔
680
                saved = false
1✔
681
        }
1✔
682
        return saved, errs
1✔
683
}
684

685
// HandleVersionChange updates the state for the node or skyhook if a version is changed on a package
686
func HandleVersionChange(skyhook SkyhookNodes) ([]*v1alpha1.Package, error) {
1✔
687
        toUninstall := make([]*v1alpha1.Package, 0)
1✔
688

1✔
689
        for _, node := range skyhook.GetNodes() {
2✔
690
                nodeState, err := node.State()
1✔
691
                if err != nil {
1✔
692
                        return nil, err
×
693
                }
×
694

695
                for _, packageStatus := range nodeState {
2✔
696
                        upgrade := false
1✔
697

1✔
698
                        _package, exists := skyhook.GetSkyhook().Spec.Packages[packageStatus.Name]
1✔
699
                        if exists && _package.Version == packageStatus.Version {
2✔
700
                                continue // no uninstall needed for package
1✔
701
                        }
702

703
                        packageStatusRef := v1alpha1.PackageRef{
1✔
704
                                Name:    packageStatus.Name,
1✔
705
                                Version: packageStatus.Version,
1✔
706
                        }
1✔
707

1✔
708
                        if !exists && packageStatus.Stage != v1alpha1.StageUninstall {
2✔
709
                                // Start uninstall of old package
1✔
710
                                err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
1✔
711
                                if err != nil {
1✔
712
                                        return nil, fmt.Errorf("error updating node status: %w", err)
×
713
                                }
×
714
                        } else if exists && _package.Version != packageStatus.Version {
2✔
715
                                comparison := version.Compare(_package.Version, packageStatus.Version)
1✔
716
                                if comparison == -2 {
1✔
717
                                        return nil, errors.New("error comparing package versions: invalid version string provided enabling webhooks validates versions before being applied")
×
718
                                }
×
719

720
                                if comparison == 1 {
2✔
721
                                        _packageStatus, found := node.PackageStatus(_package.GetUniqueName())
1✔
722
                                        if found && _packageStatus.Stage == v1alpha1.StageUpgrade {
2✔
723
                                                continue
1✔
724
                                        }
725

726
                                        // start upgrade of package
727
                                        err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageUpgrade, 0, _package.ContainerSHA)
1✔
728
                                        if err != nil {
1✔
729
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
730
                                        }
×
731

732
                                        upgrade = true
1✔
733
                                } else if comparison == -1 && packageStatus.Stage != v1alpha1.StageUninstall {
2✔
734
                                        // Start uninstall of old package
1✔
735
                                        err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
1✔
736
                                        if err != nil {
1✔
737
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
738
                                        }
×
739

740
                                        // If version changed then update new version to wait
741
                                        err = node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, v1alpha1.StageUninstall, 0, _package.ContainerSHA)
1✔
742
                                        if err != nil {
1✔
743
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
744
                                        }
×
745
                                }
746
                        }
747

748
                        // only need to create a feaux package for uninstall since it won't be in the DAG (Upgrade will)
749
                        newPackageStatus, found := node.PackageStatus(packageStatusRef.GetUniqueName())
1✔
750
                        if !upgrade && found && newPackageStatus.Stage == v1alpha1.StageUninstall && newPackageStatus.State == v1alpha1.StateInProgress {
2✔
751
                                // create fake package with the info we can salvage from the node state
1✔
752
                                newPackage := &v1alpha1.Package{
1✔
753
                                        PackageRef: packageStatusRef,
1✔
754
                                        Image:      packageStatus.Image,
1✔
755
                                }
1✔
756

1✔
757
                                // Add package to uninstall list if it's not already present
1✔
758
                                found := false
1✔
759
                                for _, uninstallPackage := range toUninstall {
2✔
760
                                        if reflect.DeepEqual(uninstallPackage, newPackage) {
1✔
761
                                                found = true
×
762
                                        }
×
763
                                }
764

765
                                if !found {
2✔
766
                                        toUninstall = append(toUninstall, newPackage)
1✔
767
                                }
1✔
768
                        }
769

770
                        // remove all config updates for the package since it's being uninstalled or
771
                        // upgraded. NOTE: The config updates must be removed whenever the version changes
772
                        // or else the package interrupt may be skipped if there is one
773
                        skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
1✔
774

1✔
775
                        // set the node and skyhook status to in progress
1✔
776
                        node.SetStatus(v1alpha1.StatusInProgress)
1✔
777
                }
778
        }
779

780
        return toUninstall, nil
1✔
781
}
782

783
// helper for get a point to a ref
784
func ptr[E any](e E) *E {
1✔
785
        return &e
1✔
786
}
1✔
787

788
// generateSafeName generates a consistent name for Kubernetes resources that is unique
789
// while staying within the specified character limit
790
func generateSafeName(maxLen int, nameParts ...string) string {
1✔
791
        name := strings.Join(nameParts, "-")
1✔
792
        // Replace dots with dashes as they're not allowed in resource names
1✔
793
        name = strings.ReplaceAll(name, ".", "-")
1✔
794

1✔
795
        unique := sha256.Sum256([]byte(name))
1✔
796
        uniqueStr := hex.EncodeToString(unique[:])[:8]
1✔
797

1✔
798
        maxlen := maxLen - len(uniqueStr) - 1
1✔
799
        if len(name) > maxlen {
2✔
800
                name = name[:maxlen]
1✔
801
        }
1✔
802

803
        return strings.ToLower(fmt.Sprintf("%s-%s", name, uniqueStr))
1✔
804
}
805

806
func (r *SkyhookReconciler) UpsertNodeLabelsAnnotationsPackages(ctx context.Context, skyhook *wrapper.Skyhook, node *corev1.Node) error {
1✔
807
        // No work to do if there is no labels or annotations for node
1✔
808
        if len(node.Labels) == 0 && len(node.Annotations) == 0 {
1✔
809
                return nil
×
810
        }
×
811

812
        annotations, err := json.Marshal(node.Annotations)
1✔
813
        if err != nil {
1✔
814
                return fmt.Errorf("error converting annotations into byte array: %w", err)
×
815
        }
×
816

817
        labels, err := json.Marshal(node.Labels)
1✔
818
        if err != nil {
1✔
819
                return fmt.Errorf("error converting labels into byte array: %w", err)
×
820
        }
×
821

822
        // marshal intermediary package metadata for the agent
823
        metadata := NewSkyhookMetadata(r.opts, skyhook)
1✔
824
        packages, err := metadata.Marshal()
1✔
825
        if err != nil {
1✔
826
                return fmt.Errorf("error converting packages into byte array: %w", err)
×
827
        }
×
828

829
        configMapName := generateSafeName(253, skyhook.Name, node.Name, "metadata")
1✔
830
        newCM := &corev1.ConfigMap{
1✔
831
                ObjectMeta: metav1.ObjectMeta{
1✔
832
                        Name:      configMapName,
1✔
833
                        Namespace: r.opts.Namespace,
1✔
834
                        Labels: map[string]string{
1✔
835
                                fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhook.Name,
1✔
836
                        },
1✔
837
                        Annotations: map[string]string{
1✔
838
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
1✔
839
                                fmt.Sprintf("%s/Node.name", v1alpha1.METADATA_PREFIX): node.Name,
1✔
840
                        },
1✔
841
                },
1✔
842
                Data: map[string]string{
1✔
843
                        "annotations.json": string(annotations),
1✔
844
                        "labels.json":      string(labels),
1✔
845
                        "packages.json":    string(packages),
1✔
846
                },
1✔
847
        }
1✔
848

1✔
849
        if err := ctrl.SetControllerReference(skyhook.Skyhook, newCM, r.scheme); err != nil {
1✔
850
                return fmt.Errorf("error setting ownership: %w", err)
×
851
        }
×
852

853
        existingConfigMap := &corev1.ConfigMap{}
1✔
854
        err = r.Get(ctx, client.ObjectKey{Namespace: r.opts.Namespace, Name: configMapName}, existingConfigMap)
1✔
855
        if err != nil {
2✔
856
                if apierrors.IsNotFound(err) {
2✔
857
                        // create
1✔
858
                        err := r.Create(ctx, newCM)
1✔
859
                        if err != nil {
1✔
860
                                return fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
861
                        }
×
862
                } else {
×
863
                        return fmt.Errorf("error getting config map: %w", err)
×
864
                }
×
865
        } else {
1✔
866
                if !reflect.DeepEqual(existingConfigMap.Data, newCM.Data) {
2✔
867
                        // update
1✔
868
                        err := r.Update(ctx, newCM)
1✔
869
                        if err != nil {
1✔
870
                                return fmt.Errorf("error updating config map [%s]: %w", newCM.Name, err)
×
871
                        }
×
872
                }
873
        }
874

875
        return nil
1✔
876
}
877

878
// HandleConfigUpdates checks whether the configMap on a package was updated and if it was the configmap will
879
// be updated and the package will be put into config mode if the package is complete or erroring
880
func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes, _package v1alpha1.Package, oldConfigMap, newConfigMap *corev1.ConfigMap) (bool, error) {
1✔
881
        completedNodes, nodeCount := 0, len(skyhook.GetNodes())
1✔
882
        erroringNode := false
1✔
883

1✔
884
        // if configmap changed
1✔
885
        if !reflect.DeepEqual(oldConfigMap.Data, newConfigMap.Data) {
2✔
886
                for _, node := range skyhook.GetNodes() {
2✔
887
                        exists, err := r.PodExists(ctx, node.GetNode().Name, skyhook.GetSkyhook().Name, &_package)
1✔
888
                        if err != nil {
1✔
889
                                return false, err
×
890
                        }
×
891

892
                        if !exists && node.IsPackageComplete(_package) {
2✔
893
                                completedNodes++
1✔
894
                        }
1✔
895

896
                        // if we have an erroring node in the config, interrupt, or post-interrupt mode
897
                        // then we will restart the config changes
898
                        if packageStatus, found := node.PackageStatus(_package.GetUniqueName()); found {
2✔
899
                                switch packageStatus.Stage {
1✔
900
                                case v1alpha1.StageConfig, v1alpha1.StageInterrupt, v1alpha1.StagePostInterrupt:
1✔
901
                                        if packageStatus.State == v1alpha1.StateErroring {
1✔
902
                                                erroringNode = true
×
903

×
904
                                                // delete the erroring pod from the node so that it can be recreated
×
905
                                                // with the updated configmap
×
906
                                                pods, err := r.dal.GetPods(ctx,
×
907
                                                        client.MatchingFields{
×
908
                                                                "spec.nodeName": node.GetNode().Name,
×
909
                                                        },
×
910
                                                        client.MatchingLabels{
×
911
                                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.GetSkyhook().Name,
×
912
                                                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
×
913
                                                        },
×
914
                                                )
×
915
                                                if err != nil {
×
916
                                                        return false, err
×
917
                                                }
×
918

919
                                                if pods != nil {
×
920
                                                        for _, pod := range pods.Items {
×
921
                                                                err := r.Delete(ctx, &pod)
×
922
                                                                if err != nil {
×
923
                                                                        return false, err
×
924
                                                                }
×
925
                                                        }
926
                                                }
927
                                        }
928
                                }
929
                        }
930
                }
931

932
                // if the update is complete or there is an erroring node put the package back into
933
                // the config mode and update the config map
934
                if completedNodes == nodeCount || erroringNode {
2✔
935
                        // get the keys in the configmap that changed
1✔
936
                        newConfigUpdates := make([]string, 0)
1✔
937
                        for key, new_val := range newConfigMap.Data {
2✔
938
                                if old_val, exists := oldConfigMap.Data[key]; !exists || old_val != new_val {
2✔
939
                                        newConfigUpdates = append(newConfigUpdates, key)
1✔
940
                                }
1✔
941
                        }
942

943
                        // if updates completed then clear out old config updates as they are finished
944
                        if completedNodes == nodeCount {
2✔
945
                                skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
1✔
946
                        }
1✔
947

948
                        // Add the new changed keys to the config updates
949
                        skyhook.GetSkyhook().AddConfigUpdates(_package.Name, newConfigUpdates...)
1✔
950

1✔
951
                        for _, node := range skyhook.GetNodes() {
2✔
952
                                err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageConfig, 0, _package.ContainerSHA)
1✔
953
                                if err != nil {
1✔
954
                                        return false, fmt.Errorf("error upserting node status [%s]: %w", node.GetNode().Name, err)
×
955
                                }
×
956

957
                                node.SetStatus(v1alpha1.StatusInProgress)
1✔
958
                        }
959

960
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
1✔
961
                        if len(errs) > 0 {
1✔
962
                                return false, utilerrors.NewAggregate(errs)
×
963
                        }
×
964

965
                        // update config map
966
                        err := r.Update(ctx, newConfigMap)
1✔
967
                        if err != nil {
1✔
968
                                return false, fmt.Errorf("error updating config map [%s]: %w", newConfigMap.Name, err)
×
969
                        }
×
970

971
                        return true, nil
1✔
972
                }
973
        }
974

975
        return false, nil
1✔
976
}
977

978
func (r *SkyhookReconciler) UpsertConfigmaps(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, error) {
1✔
979
        updated := false
1✔
980

1✔
981
        var list corev1.ConfigMapList
1✔
982
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name})
1✔
983
        if err != nil {
1✔
984
                return false, fmt.Errorf("error listing config maps while upserting: %w", err)
×
985
        }
×
986

987
        existingCMs := make(map[string]corev1.ConfigMap)
1✔
988
        for _, cm := range list.Items {
2✔
989
                existingCMs[cm.Name] = cm
1✔
990
        }
1✔
991

992
        // clean up from an update
993
        shouldExist := make(map[string]struct{})
1✔
994
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
2✔
995
                shouldExist[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))] = struct{}{}
1✔
996
        }
1✔
997

998
        for k, v := range existingCMs {
2✔
999
                if _, ok := shouldExist[k]; !ok {
2✔
1000
                        // delete
1✔
1001
                        err := r.Delete(ctx, &v)
1✔
1002
                        if err != nil {
1✔
1003
                                return false, fmt.Errorf("error deleting existing config map [%s] while upserting: %w", v.Name, err)
×
1004
                        }
×
1005
                }
1006
        }
1007

1008
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
2✔
1009
                if len(_package.ConfigMap) > 0 {
2✔
1010

1✔
1011
                        newCM := &corev1.ConfigMap{
1✔
1012
                                ObjectMeta: metav1.ObjectMeta{
1✔
1013
                                        Name:      strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version)),
1✔
1014
                                        Namespace: r.opts.Namespace,
1✔
1015
                                        Labels: map[string]string{
1✔
1016
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
1✔
1017
                                        },
1✔
1018
                                        Annotations: map[string]string{
1✔
1019
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):            skyhook.GetSkyhook().Name,
1✔
1020
                                                fmt.Sprintf("%s/Package.Name", v1alpha1.METADATA_PREFIX):    _package.Name,
1✔
1021
                                                fmt.Sprintf("%s/Package.Version", v1alpha1.METADATA_PREFIX): _package.Version,
1✔
1022
                                        },
1✔
1023
                                },
1✔
1024
                                Data: _package.ConfigMap,
1✔
1025
                        }
1✔
1026
                        // set owner of CM to the SCR, which will clean up the CM in delete of the SCR
1✔
1027
                        if err := ctrl.SetControllerReference(skyhook.GetSkyhook().Skyhook, newCM, r.scheme); err != nil {
1✔
1028
                                return false, fmt.Errorf("error setting ownership of cm: %w", err)
×
1029
                        }
×
1030

1031
                        if existingCM, ok := existingCMs[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))]; ok {
2✔
1032
                                updatedConfigMap, err := r.HandleConfigUpdates(ctx, clusterState, skyhook, _package, &existingCM, newCM)
1✔
1033
                                if err != nil {
1✔
1034
                                        return false, fmt.Errorf("error updating config map [%s]: %s", newCM.Name, err)
×
1035
                                }
×
1036
                                if updatedConfigMap {
2✔
1037
                                        updated = true
1✔
1038
                                }
1✔
1039
                        } else {
1✔
1040
                                // create
1✔
1041
                                err := r.Create(ctx, newCM)
1✔
1042
                                if err != nil {
1✔
1043
                                        return false, fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
1044
                                }
×
1045
                        }
1046
                }
1047
        }
1048

1049
        return updated, nil
1✔
1050
}
1051

1052
func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
1✔
1053

1✔
1054
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
1✔
1055
                "spec.nodeName": skyhookNode.GetNode().Name,
1✔
1056
        })
1✔
1057
        if err != nil {
1✔
1058
                return false, err
×
1059
        }
×
1060

1061
        if pods == nil || len(pods.Items) == 0 {
1✔
1062
                return true, nil
×
1063
        }
×
1064

1065
        // checking for any running or pending pods with no toleration to unschedulable
1066
        // if its has an unschedulable toleration we can ignore
1067
        for _, pod := range pods.Items {
2✔
1068

1✔
1069
                if ShouldEvict(&pod) {
2✔
1070
                        return false, nil
1✔
1071
                }
1✔
1072

1073
        }
1074

1075
        return true, nil
1✔
1076
}
1077

1078
func ShouldEvict(pod *corev1.Pod) bool {
1✔
1079
        switch pod.Status.Phase {
1✔
1080
        case corev1.PodRunning, corev1.PodPending:
1✔
1081

1✔
1082
                for _, taint := range pod.Spec.Tolerations {
2✔
1083
                        switch taint.Key {
1✔
1084
                        case "node.kubernetes.io/unschedulable": // ignoring
1✔
1085
                                return false
1✔
1086
                        }
1087
                }
1088

1089
                if len(pod.ObjectMeta.OwnerReferences) > 1 {
1✔
1090
                        for _, owner := range pod.ObjectMeta.OwnerReferences {
×
1091
                                if owner.Kind == "DaemonSet" { // ignoring
×
1092
                                        return false
×
1093
                                }
×
1094
                        }
1095
                }
1096

1097
                if pod.GetNamespace() == "kube-system" {
1✔
1098
                        return false
×
1099
                }
×
1100

1101
                return true
1✔
1102
        }
1103
        return false
1✔
1104
}
1105

1106
// HandleFinalizer returns true only if we container is deleted and we handled it completely, else false
1107
func (r *SkyhookReconciler) HandleFinalizer(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
1✔
1108
        if skyhook.GetSkyhook().DeletionTimestamp.IsZero() { // if not deleted, and does not have our finalizer, add it
2✔
1109
                if !controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
2✔
1110
                        controllerutil.AddFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
1✔
1111

1✔
1112
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
1✔
1113
                                return false, fmt.Errorf("error updating skyhook to add finalizer: %w", err)
×
1114
                        }
×
1115
                }
1116
        } else { // being delete, time to handle our
1✔
1117
                if controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
2✔
1118

1✔
1119
                        errs := make([]error, 0)
1✔
1120

1✔
1121
                        // zero out all the metrics related to this skyhook both skyhook and packages
1✔
1122
                        zeroOutSkyhookMetrics(skyhook)
1✔
1123

1✔
1124
                        for _, node := range skyhook.GetNodes() {
2✔
1125
                                patch := client.StrategicMergeFrom(node.GetNode().DeepCopy())
1✔
1126

1✔
1127
                                node.Uncordon()
1✔
1128

1✔
1129
                                // if this doesn't change the node then don't patch
1✔
1130
                                if !node.Changed() {
2✔
1131
                                        continue
1✔
1132
                                }
1133

1134
                                err := r.Patch(ctx, node.GetNode(), patch)
1✔
1135
                                if err != nil {
1✔
1136
                                        errs = append(errs, fmt.Errorf("error patching node [%s] in finalizer: %w", node.GetNode().Name, err))
×
1137
                                }
×
1138
                        }
1139

1140
                        if len(errs) > 0 { // we errored, so we need to return error, otherwise we would release the skyhook when we didnt finish
1✔
1141
                                return false, utilerrors.NewAggregate(errs)
×
1142
                        }
×
1143

1144
                        controllerutil.RemoveFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
1✔
1145
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
2✔
1146
                                return false, fmt.Errorf("error updating skyhook removing finalizer: %w", err)
1✔
1147
                        }
1✔
1148
                        // should be 1, and now 2. we want to set ObservedGeneration up to not trigger an logic from this update adding the finalizer
1149
                        skyhook.GetSkyhook().Status.ObservedGeneration = skyhook.GetSkyhook().Status.ObservedGeneration + 1
1✔
1150

1✔
1151
                        if err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
2✔
1152
                                return false, fmt.Errorf("error updating skyhook status: %w", err)
1✔
1153
                        }
1✔
1154

1155
                        return true, nil
×
1156
                }
1157
        }
1158
        return false, nil
1✔
1159
}
1160

1161
// HasNonInterruptWork returns true if pods are running on the node that are either packages, or matches the SCR selector
1162
func (r *SkyhookReconciler) HasNonInterruptWork(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
1✔
1163

1✔
1164
        selector, err := metav1.LabelSelectorAsSelector(&skyhookNode.GetSkyhook().Spec.PodNonInterruptLabels)
1✔
1165
        if err != nil {
1✔
1166
                return false, fmt.Errorf("error creating selector: %w", err)
×
1167
        }
×
1168

1169
        if selector.Empty() { // when selector is empty it does not do any selecting, ie will return all pods on node.
2✔
1170
                return false, nil
1✔
1171
        }
1✔
1172

1173
        pods, err := r.dal.GetPods(ctx,
1✔
1174
                client.MatchingLabelsSelector{Selector: selector},
1✔
1175
                client.MatchingFields{
1✔
1176
                        "spec.nodeName": skyhookNode.GetNode().Name,
1✔
1177
                },
1✔
1178
        )
1✔
1179
        if err != nil {
1✔
1180
                return false, fmt.Errorf("error getting pods: %w", err)
×
1181
        }
×
1182

1183
        if pods == nil || len(pods.Items) == 0 {
2✔
1184
                return false, nil
1✔
1185
        }
1✔
1186

1187
        for _, pod := range pods.Items {
2✔
1188
                switch pod.Status.Phase {
1✔
1189
                case corev1.PodRunning, corev1.PodPending:
1✔
1190
                        return true, nil
1✔
1191
                }
1192
        }
1193

UNCOV
1194
        return false, nil
×
1195
}
1196

1197
func (r *SkyhookReconciler) HasRunningPackages(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
1✔
1198
        pods, err := r.dal.GetPods(ctx,
1✔
1199
                client.HasLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX)},
1✔
1200
                client.MatchingFields{
1✔
1201
                        "spec.nodeName": skyhookNode.GetNode().Name,
1✔
1202
                },
1✔
1203
        )
1✔
1204
        if err != nil {
1✔
1205
                return false, fmt.Errorf("error getting pods: %w", err)
×
1206
        }
×
1207

1208
        return pods != nil && len(pods.Items) > 0, nil
1✔
1209
}
1210

1211
func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
1✔
1212
        drained, err := r.IsDrained(ctx, skyhookNode)
1✔
1213
        if err != nil {
1✔
1214
                return false, err
×
1215
        }
×
1216
        if drained {
2✔
1217
                return true, nil
1✔
1218
        }
1✔
1219

1220
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
1✔
1221
                "spec.nodeName": skyhookNode.GetNode().Name,
1✔
1222
        })
1✔
1223
        if err != nil {
1✔
1224
                return false, err
×
1225
        }
×
1226

1227
        if pods == nil || len(pods.Items) == 0 {
1✔
1228
                return true, nil
×
1229
        }
×
1230

1231
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookInterrupt,
1✔
1232
                "draining node [%s] package [%s:%s] from [skyhook:%s]",
1✔
1233
                skyhookNode.GetNode().Name,
1✔
1234
                _package.Name,
1✔
1235
                _package.Version,
1✔
1236
                skyhookNode.GetSkyhook().Name,
1✔
1237
        )
1✔
1238

1✔
1239
        errs := make([]error, 0)
1✔
1240
        for _, pod := range pods.Items {
2✔
1241

1✔
1242
                if ShouldEvict(&pod) {
2✔
1243
                        eviction := policyv1.Eviction{}
1✔
1244
                        err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction)
1✔
1245
                        if err != nil {
1✔
1246
                                errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err))
×
1247
                        }
×
1248
                }
1249
        }
1250

1251
        return len(errs) == 0, utilerrors.NewAggregate(errs)
1✔
1252
}
1253

1254
// Interrupt should not be called unless safe to do so, IE already cordoned and drained
1255
func (r *SkyhookReconciler) Interrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, _interrupt *v1alpha1.Interrupt) error {
1✔
1256

1✔
1257
        hasPackagesRunning, err := r.HasRunningPackages(ctx, skyhookNode)
1✔
1258
        if err != nil {
1✔
1259
                return err
×
1260
        }
×
1261

1262
        if hasPackagesRunning { // keep waiting...
2✔
1263
                return nil
1✔
1264
        }
1✔
1265

1266
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
1✔
1267
        if err != nil {
1✔
1268
                return err
×
1269
        }
×
1270
        if exists {
1✔
1271
                // nothing to do here, already running
×
1272
                return nil
×
1273
        }
×
1274

1275
        argEncode, err := _interrupt.ToArgs()
1✔
1276
        if err != nil {
1✔
1277
                return fmt.Errorf("error creating interrupt args: %w", err)
×
1278
        }
×
1279

1280
        pod := createInterruptPodForPackage(r.opts, _interrupt, argEncode, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name)
1✔
1281

1✔
1282
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, v1alpha1.StageInterrupt, _package); err != nil {
1✔
1283
                return fmt.Errorf("error setting package on interrupt: %w", err)
×
1284
        }
×
1285

1286
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
1✔
1287
                return fmt.Errorf("error setting ownership: %w", err)
×
1288
        }
×
1289

1290
        if err := r.Create(ctx, pod); err != nil {
1✔
1291
                return fmt.Errorf("error creating interruption pod: %w", err)
×
1292
        }
×
1293

1294
        _ = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageInterrupt, 0, _package.ContainerSHA)
1✔
1295

1✔
1296
        r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonSkyhookInterrupt,
1✔
1297
                "Interrupting node [%s] package [%s:%s] from [skyhook:%s]",
1✔
1298
                skyhookNode.GetNode().Name,
1✔
1299
                _package.Name,
1✔
1300
                _package.Version,
1✔
1301
                skyhookNode.GetSkyhook().Name)
1✔
1302

1✔
1303
        return nil
1✔
1304
}
1305

1306
// fudgeInterruptWithPriority takes a list of packages, interrupts, and configUpdates and returns the correct merged interrupt to run to handle all the packages
1307
func fudgeInterruptWithPriority(next []*v1alpha1.Package, configUpdates map[string][]string, interrupts map[string][]*v1alpha1.Interrupt) (*v1alpha1.Interrupt, string) {
1✔
1308
        var ret *v1alpha1.Interrupt
1✔
1309
        var pack string
1✔
1310

1✔
1311
        // map interrupt to priority
1✔
1312
        // A lower priority value means a higher priority and will be used in favor of anything with a higher value
1✔
1313
        var priorities = map[v1alpha1.InterruptType]int{
1✔
1314
                v1alpha1.REBOOT:               0,
1✔
1315
                v1alpha1.RESTART_ALL_SERVICES: 1,
1✔
1316
                v1alpha1.SERVICE:              2,
1✔
1317
                v1alpha1.NOOP:                 3,
1✔
1318
        }
1✔
1319

1✔
1320
        for _, _package := range next {
2✔
1321

1✔
1322
                if len(configUpdates[_package.Name]) == 0 {
2✔
1323
                        interrupts[_package.Name] = []*v1alpha1.Interrupt{}
1✔
1324
                        if _package.HasInterrupt() {
2✔
1325
                                interrupts[_package.Name] = append(interrupts[_package.Name], _package.Interrupt)
1✔
1326
                        }
1✔
1327
                }
1328
        }
1329

1330
        packageNames := make([]string, 0)
1✔
1331
        for _, pkg := range next {
2✔
1332
                packageNames = append(packageNames, pkg.Name)
1✔
1333
        }
1✔
1334
        sort.Strings(packageNames)
1✔
1335

1✔
1336
        for _, _package := range packageNames {
2✔
1337
                _interrupts, ok := interrupts[_package]
1✔
1338
                if !ok {
2✔
1339
                        continue
1✔
1340
                }
1341

1342
                for _, interrupt := range _interrupts {
2✔
1343
                        if ret == nil { // prime ret, base case
2✔
1344
                                ret = interrupt
1✔
1345
                                pack = _package
1✔
1346
                        }
1✔
1347

1348
                        // short circuit, reboot has highest priority
1349
                        switch interrupt.Type {
1✔
1350
                        case v1alpha1.REBOOT:
1✔
1351
                                return interrupt, _package
1✔
1352
                        }
1353

1354
                        // check if interrupt is higher priority using the priority_order
1355
                        // A lower priority value means a higher priority
1356
                        if priorities[interrupt.Type] < priorities[ret.Type] {
2✔
1357
                                ret = interrupt
1✔
1358
                                pack = _package
1✔
1359
                        } else if priorities[interrupt.Type] == priorities[ret.Type] {
3✔
1360
                                mergeInterrupt(ret, interrupt)
1✔
1361
                        }
1✔
1362
                }
1363
        }
1364

1365
        return ret, pack // return merged interrupt and package
1✔
1366
}
1367

1368
func mergeInterrupt(left, right *v1alpha1.Interrupt) {
1✔
1369

1✔
1370
        // make sure both are of type service
1✔
1371
        if left.Type != v1alpha1.SERVICE || right.Type != v1alpha1.SERVICE {
2✔
1372
                return
1✔
1373
        }
1✔
1374

1375
        left.Services = merge(left.Services, right.Services)
1✔
1376
}
1377

1378
func merge[T cmp.Ordered](left, right []T) []T {
1✔
1379
        for _, r := range right {
2✔
1380
                if !slices.Contains(left, r) {
2✔
1381
                        left = append(left, r)
1✔
1382
                }
1✔
1383
        }
1384
        slices.Sort(left)
1✔
1385
        return left
1✔
1386
}
1387

1388
// ValidateNodeConfigmaps validates that there are no orphaned or stale config maps for a node
1389
func (r *SkyhookReconciler) ValidateNodeConfigmaps(ctx context.Context, skyhookName string, nodes []wrapper.SkyhookNode) (bool, error) {
1✔
1390
        var list corev1.ConfigMapList
1✔
1391
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhookName})
1✔
1392
        if err != nil {
1✔
1393
                return false, fmt.Errorf("error listing config maps: %w", err)
×
1394
        }
×
1395

1396
        // No configmaps created by this skyhook, no work needs to be done
1397
        if len(list.Items) == 0 {
2✔
1398
                return false, nil
1✔
1399
        }
1✔
1400

1401
        existingCMs := make(map[string]corev1.ConfigMap)
1✔
1402
        for _, cm := range list.Items {
2✔
1403
                existingCMs[cm.Name] = cm
1✔
1404
        }
1✔
1405

1406
        shouldExist := make(map[string]struct{})
1✔
1407
        for _, node := range nodes {
2✔
1408
                shouldExist[generateSafeName(253, skyhookName, node.GetNode().Name, "metadata")] = struct{}{}
1✔
1409
        }
1✔
1410

1411
        update := false
1✔
1412
        errs := make([]error, 0)
1✔
1413
        for k, v := range existingCMs {
2✔
1414
                if _, ok := shouldExist[k]; !ok {
1✔
1415
                        update = true
×
1416
                        err := r.Delete(ctx, &v)
×
1417
                        if err != nil {
×
1418
                                errs = append(errs, fmt.Errorf("error deleting existing config map [%s]: %w", v.Name, err))
×
1419
                        }
×
1420
                }
1421
        }
1422

1423
        // Ensure packages.json is present and up-to-date for expected configmaps
1424
        skyhookCR, err := r.dal.GetSkyhook(ctx, skyhookName)
1✔
1425
        if err != nil {
1✔
1426
                return update, fmt.Errorf("error getting skyhook for metadata validation: %w", err)
×
1427
        }
×
1428
        skyhookWrapper := wrapper.NewSkyhookWrapper(skyhookCR)
1✔
1429
        metadata := NewSkyhookMetadata(r.opts, skyhookWrapper)
1✔
1430
        expectedBytes, err := metadata.Marshal()
1✔
1431
        if err != nil {
1✔
1432
                return update, fmt.Errorf("error marshalling metadata for validation: %w", err)
×
1433
        }
×
1434
        expected := string(expectedBytes)
1✔
1435

1✔
1436
        for i := range list.Items {
2✔
1437
                cm := &list.Items[i]
1✔
1438
                if _, ok := shouldExist[cm.Name]; !ok {
1✔
1439
                        continue
×
1440
                }
1441
                if cm.Data == nil {
1✔
1442
                        cm.Data = make(map[string]string)
×
1443
                }
×
1444
                if cm.Data["packages.json"] != expected {
2✔
1445
                        cm.Data["packages.json"] = expected
1✔
1446
                        if err := r.Update(ctx, cm); err != nil {
2✔
1447
                                errs = append(errs, fmt.Errorf("error updating packages.json on config map [%s]: %w", cm.Name, err))
1✔
1448
                        } else {
2✔
1449
                                update = true
1✔
1450
                        }
1✔
1451
                }
1452
        }
1453

1454
        return update, utilerrors.NewAggregate(errs)
1✔
1455
}
1456

1457
// PodExists tests if this package is exists on a node.
1458
func (r *SkyhookReconciler) PodExists(ctx context.Context, nodeName, skyhookName string, _package *v1alpha1.Package) (bool, error) {
1✔
1459

1✔
1460
        pods, err := r.dal.GetPods(ctx,
1✔
1461
                client.MatchingFields{
1✔
1462
                        "spec.nodeName": nodeName,
1✔
1463
                },
1✔
1464
                client.MatchingLabels{
1✔
1465
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhookName,
1✔
1466
                        fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
1✔
1467
                },
1✔
1468
        )
1✔
1469
        if err != nil {
1✔
1470
                return false, fmt.Errorf("error check from existing pods: %w", err)
×
1471
        }
×
1472

1473
        if pods == nil || len(pods.Items) == 0 {
2✔
1474
                return false, nil
1✔
1475
        }
1✔
1476
        return true, nil
1✔
1477
}
1478

1479
// createInterruptPodForPackage returns the pod spec for an interrupt pod given an package
1480
func createInterruptPodForPackage(opts SkyhookOperatorOptions, _interrupt *v1alpha1.Interrupt, argEncode string, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string) *corev1.Pod {
1✔
1481
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
1✔
1482
                opts.CopyDirRoot,
1✔
1483
                skyhook.Name,
1✔
1484
                _package.Name,
1✔
1485
                _package.Version,
1✔
1486
                skyhook.UID,
1✔
1487
                skyhook.Generation,
1✔
1488
        )
1✔
1489

1✔
1490
        volumes := []corev1.Volume{
1✔
1491
                {
1✔
1492
                        Name: "root-mount",
1✔
1493
                        VolumeSource: corev1.VolumeSource{
1✔
1494
                                HostPath: &corev1.HostPathVolumeSource{
1✔
1495
                                        Path: "/",
1✔
1496
                                },
1✔
1497
                        },
1✔
1498
                },
1✔
1499
                {
1✔
1500
                        // node names in different CSPs might include dots which isn't allowed in volume names
1✔
1501
                        // so we have to replace all dots with dashes
1✔
1502
                        Name: generateSafeName(63, skyhook.Name, nodeName, "metadata"),
1✔
1503
                        VolumeSource: corev1.VolumeSource{
1✔
1504
                                ConfigMap: &corev1.ConfigMapVolumeSource{
1✔
1505
                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
1506
                                                Name: strings.ReplaceAll(fmt.Sprintf("%s-%s-metadata", skyhook.Name, nodeName), ".", "-"),
1✔
1507
                                        },
1✔
1508
                                },
1✔
1509
                        },
1✔
1510
                },
1✔
1511
        }
1✔
1512
        volumeMounts := []corev1.VolumeMount{
1✔
1513
                {
1✔
1514
                        Name:             "root-mount",
1✔
1515
                        MountPath:        "/root",
1✔
1516
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
1✔
1517
                },
1✔
1518
        }
1✔
1519

1✔
1520
        pod := &corev1.Pod{
1✔
1521
                ObjectMeta: metav1.ObjectMeta{
1✔
1522
                        Name:      generateSafeName(63, skyhook.Name, "interrupt", string(_interrupt.Type), nodeName),
1✔
1523
                        Namespace: opts.Namespace,
1✔
1524
                        Labels: map[string]string{
1✔
1525
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
1✔
1526
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX):   fmt.Sprintf("%s-%s", _package.Name, _package.Version),
1✔
1527
                                fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX): "True",
1✔
1528
                        },
1✔
1529
                },
1✔
1530
                Spec: corev1.PodSpec{
1✔
1531
                        NodeName:      nodeName,
1✔
1532
                        RestartPolicy: corev1.RestartPolicyOnFailure,
1✔
1533
                        InitContainers: []corev1.Container{
1✔
1534
                                {
1✔
1535
                                        Name:  InterruptContainerName,
1✔
1536
                                        Image: getAgentImage(opts, _package),
1✔
1537
                                        Args:  []string{"interrupt", "/root", copyDir, argEncode},
1✔
1538
                                        Env:   getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name),
1✔
1539
                                        SecurityContext: &corev1.SecurityContext{
1✔
1540
                                                Privileged: ptr(true),
1✔
1541
                                        },
1✔
1542
                                        VolumeMounts: volumeMounts,
1✔
1543
                                        Resources: corev1.ResourceRequirements{
1✔
1544
                                                Limits: corev1.ResourceList{
1✔
1545
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
1✔
1546
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
1✔
1547
                                                },
1✔
1548
                                                Requests: corev1.ResourceList{
1✔
1549
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
1✔
1550
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
1✔
1551
                                                },
1✔
1552
                                        },
1✔
1553
                                },
1✔
1554
                        },
1✔
1555
                        Containers: []corev1.Container{
1✔
1556
                                {
1✔
1557
                                        Name:  "pause",
1✔
1558
                                        Image: opts.PauseImage,
1✔
1559
                                        Resources: corev1.ResourceRequirements{
1✔
1560
                                                Limits: corev1.ResourceList{
1✔
1561
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
1✔
1562
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
1✔
1563
                                                },
1✔
1564
                                                Requests: corev1.ResourceList{
1✔
1565
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
1✔
1566
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
1✔
1567
                                                },
1✔
1568
                                        },
1✔
1569
                                },
1✔
1570
                        },
1✔
1571
                        ImagePullSecrets: []corev1.LocalObjectReference{
1✔
1572
                                {
1✔
1573
                                        Name: opts.ImagePullSecret,
1✔
1574
                                },
1✔
1575
                        },
1✔
1576
                        HostPID:     true,
1✔
1577
                        HostNetwork: true,
1✔
1578
                        // If you change these go change the SelectNode toleration in cluster_state.go
1✔
1579
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
1✔
1580
                                {
1✔
1581
                                        Key:      TaintUnschedulable,
1✔
1582
                                        Operator: corev1.TolerationOpExists,
1✔
1583
                                },
1✔
1584
                                opts.GetRuntimeRequiredToleration(),
1✔
1585
                        }, skyhook.Spec.AdditionalTolerations...),
1✔
1586
                        Volumes: volumes,
1✔
1587
                },
1✔
1588
        }
1✔
1589
        return pod
1✔
1590
}
1✔
1591

1592
func trunstr(str string, length int) string {
1✔
1593
        if len(str) > length {
1✔
1594
                return str[:length]
×
1595
        }
×
1596
        return str
1✔
1597
}
1598

1599
func getAgentImage(opts SkyhookOperatorOptions, _package *v1alpha1.Package) string {
1✔
1600
        if _package.AgentImageOverride != "" {
2✔
1601
                return _package.AgentImageOverride
1✔
1602
        }
1✔
1603
        return opts.AgentImage
1✔
1604
}
1605

1606
// getPackageImage returns the full image reference for a package, using the digest if specified
1607
func getPackageImage(_package *v1alpha1.Package) string {
1✔
1608
        if _package.ContainerSHA != "" {
2✔
1609
                // When containerSHA is specified, use it instead of the version tag for immutable image reference
1✔
1610
                return fmt.Sprintf("%s@%s", _package.Image, _package.ContainerSHA)
1✔
1611
        }
1✔
1612
        // Fall back to version tag
1613
        return fmt.Sprintf("%s:%s", _package.Image, _package.Version)
1✔
1614
}
1615

1616
func getAgentConfigEnvVars(opts SkyhookOperatorOptions, packageName string, packageVersion string, resourceID string, skyhookName string) []corev1.EnvVar {
1✔
1617
        return []corev1.EnvVar{
1✔
1618
                {
1✔
1619
                        Name:  "SKYHOOK_LOG_DIR",
1✔
1620
                        Value: fmt.Sprintf("%s/%s", opts.AgentLogRoot, skyhookName),
1✔
1621
                },
1✔
1622
                {
1✔
1623
                        Name:  "SKYHOOK_ROOT_DIR",
1✔
1624
                        Value: fmt.Sprintf("%s/%s", opts.CopyDirRoot, skyhookName),
1✔
1625
                },
1✔
1626
                {
1✔
1627
                        Name:  "COPY_RESOLV",
1✔
1628
                        Value: "false",
1✔
1629
                },
1✔
1630
                {
1✔
1631
                        Name:  "SKYHOOK_RESOURCE_ID",
1✔
1632
                        Value: fmt.Sprintf("%s_%s_%s", resourceID, packageName, packageVersion),
1✔
1633
                },
1✔
1634
        }
1✔
1635
}
1✔
1636

1637
// createPodFromPackage creates a pod spec for a skyhook pod for a given package
1638
func createPodFromPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string, stage v1alpha1.Stage) *corev1.Pod {
1✔
1639
        // Generate consistent names that won't exceed k8s limits
1✔
1640
        volumeName := generateSafeName(63, "metadata", nodeName)
1✔
1641
        configMapName := generateSafeName(253, skyhook.Name, nodeName, "metadata")
1✔
1642

1✔
1643
        volumes := []corev1.Volume{
1✔
1644
                {
1✔
1645
                        Name: "root-mount",
1✔
1646
                        VolumeSource: corev1.VolumeSource{
1✔
1647
                                HostPath: &corev1.HostPathVolumeSource{
1✔
1648
                                        Path: "/",
1✔
1649
                                },
1✔
1650
                        },
1✔
1651
                },
1✔
1652
                {
1✔
1653
                        Name: volumeName,
1✔
1654
                        VolumeSource: corev1.VolumeSource{
1✔
1655
                                ConfigMap: &corev1.ConfigMapVolumeSource{
1✔
1656
                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
1657
                                                Name: configMapName,
1✔
1658
                                        },
1✔
1659
                                },
1✔
1660
                        },
1✔
1661
                },
1✔
1662
        }
1✔
1663

1✔
1664
        volumeMounts := []corev1.VolumeMount{
1✔
1665
                {
1✔
1666
                        Name:             "root-mount",
1✔
1667
                        MountPath:        "/root",
1✔
1668
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
1✔
1669
                },
1✔
1670
                {
1✔
1671
                        Name:      volumeName,
1✔
1672
                        MountPath: "/skyhook-package/node-metadata",
1✔
1673
                },
1✔
1674
        }
1✔
1675

1✔
1676
        if len(_package.ConfigMap) > 0 {
2✔
1677
                volumeMounts = append(volumeMounts, corev1.VolumeMount{
1✔
1678
                        Name:      _package.Name,
1✔
1679
                        MountPath: "/skyhook-package/configmaps",
1✔
1680
                })
1✔
1681

1✔
1682
                volumes = append(volumes, corev1.Volume{
1✔
1683
                        Name: _package.Name,
1✔
1684
                        VolumeSource: corev1.VolumeSource{
1✔
1685
                                ConfigMap: &corev1.ConfigMapVolumeSource{
1✔
1686
                                        LocalObjectReference: corev1.LocalObjectReference{
1✔
1687
                                                Name: strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.Name, _package.Name, _package.Version)),
1✔
1688
                                        },
1✔
1689
                                },
1✔
1690
                        },
1✔
1691
                })
1✔
1692
        }
1✔
1693

1694
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
1✔
1695
                opts.CopyDirRoot,
1✔
1696
                skyhook.Name,
1✔
1697
                _package.Name,
1✔
1698
                _package.Version,
1✔
1699
                skyhook.UID,
1✔
1700
                skyhook.Generation,
1✔
1701
        )
1✔
1702
        applyargs := []string{strings.ToLower(string(stage)), "/root", copyDir}
1✔
1703
        checkargs := []string{strings.ToLower(string(stage) + "-check"), "/root", copyDir}
1✔
1704

1✔
1705
        agentEnvs := append(
1✔
1706
                _package.Env,
1✔
1707
                getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name)...,
1✔
1708
        )
1✔
1709

1✔
1710
        pod := &corev1.Pod{
1✔
1711
                ObjectMeta: metav1.ObjectMeta{
1✔
1712
                        Name:      generateSafeName(63, skyhook.Name, _package.Name, _package.Version, string(stage), nodeName),
1✔
1713
                        Namespace: opts.Namespace,
1✔
1714
                        Labels: map[string]string{
1✔
1715
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.Name,
1✔
1716
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
1✔
1717
                        },
1✔
1718
                },
1✔
1719
                Spec: corev1.PodSpec{
1✔
1720
                        NodeName:      nodeName,
1✔
1721
                        RestartPolicy: corev1.RestartPolicyOnFailure,
1✔
1722
                        InitContainers: []corev1.Container{
1✔
1723
                                {
1✔
1724
                                        Name:            fmt.Sprintf("%s-init", trunstr(_package.Name, 43)),
1✔
1725
                                        Image:           getPackageImage(_package),
1✔
1726
                                        ImagePullPolicy: "Always",
1✔
1727
                                        Command:         []string{"/bin/sh"},
1✔
1728
                                        Args: []string{
1✔
1729
                                                "-c",
1✔
1730
                                                "mkdir -p /root/${SKYHOOK_DIR} && cp -r /skyhook-package/* /root/${SKYHOOK_DIR}",
1✔
1731
                                        },
1✔
1732
                                        Env: []corev1.EnvVar{
1✔
1733
                                                {
1✔
1734
                                                        Name:  "SKYHOOK_DIR",
1✔
1735
                                                        Value: copyDir,
1✔
1736
                                                },
1✔
1737
                                        },
1✔
1738
                                        SecurityContext: &corev1.SecurityContext{
1✔
1739
                                                Privileged: ptr(true),
1✔
1740
                                        },
1✔
1741
                                        VolumeMounts: volumeMounts,
1✔
1742
                                },
1✔
1743
                                {
1✔
1744
                                        Name:            fmt.Sprintf("%s-%s", trunstr(_package.Name, 43), stage),
1✔
1745
                                        Image:           getAgentImage(opts, _package),
1✔
1746
                                        ImagePullPolicy: "Always",
1✔
1747
                                        Args:            applyargs,
1✔
1748
                                        Env:             agentEnvs,
1✔
1749
                                        SecurityContext: &corev1.SecurityContext{
1✔
1750
                                                Privileged: ptr(true),
1✔
1751
                                        },
1✔
1752
                                        VolumeMounts: volumeMounts,
1✔
1753
                                },
1✔
1754
                                {
1✔
1755
                                        Name:            fmt.Sprintf("%s-%scheck", trunstr(_package.Name, 43), stage),
1✔
1756
                                        Image:           getAgentImage(opts, _package),
1✔
1757
                                        ImagePullPolicy: "Always",
1✔
1758
                                        Args:            checkargs,
1✔
1759
                                        Env:             agentEnvs,
1✔
1760
                                        SecurityContext: &corev1.SecurityContext{
1✔
1761
                                                Privileged: ptr(true),
1✔
1762
                                        },
1✔
1763
                                        VolumeMounts: volumeMounts,
1✔
1764
                                },
1✔
1765
                        },
1✔
1766
                        Containers: []corev1.Container{
1✔
1767
                                {
1✔
1768
                                        Name:  "pause",
1✔
1769
                                        Image: opts.PauseImage,
1✔
1770
                                        Resources: corev1.ResourceRequirements{
1✔
1771
                                                Limits: corev1.ResourceList{
1✔
1772
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
1✔
1773
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
1✔
1774
                                                },
1✔
1775
                                                Requests: corev1.ResourceList{
1✔
1776
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
1✔
1777
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
1✔
1778
                                                },
1✔
1779
                                        },
1✔
1780
                                },
1✔
1781
                        },
1✔
1782
                        ImagePullSecrets: []corev1.LocalObjectReference{
1✔
1783
                                {
1✔
1784
                                        Name: opts.ImagePullSecret,
1✔
1785
                                },
1✔
1786
                        },
1✔
1787
                        Volumes:     volumes,
1✔
1788
                        HostPID:     true,
1✔
1789
                        HostNetwork: true,
1✔
1790
                        // If you change these go change the SelectNode toleration in cluster_state.go
1✔
1791
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
1✔
1792
                                {
1✔
1793
                                        Key:      TaintUnschedulable,
1✔
1794
                                        Operator: corev1.TolerationOpExists,
1✔
1795
                                },
1✔
1796
                                opts.GetRuntimeRequiredToleration(),
1✔
1797
                        }, skyhook.Spec.AdditionalTolerations...),
1✔
1798
                },
1✔
1799
        }
1✔
1800
        if _package.GracefulShutdown != nil {
2✔
1801
                pod.Spec.TerminationGracePeriodSeconds = ptr(int64(_package.GracefulShutdown.Duration.Seconds()))
1✔
1802
        }
1✔
1803
        setPodResources(pod, _package.Resources)
1✔
1804
        return pod
1✔
1805
}
1806

1807
// FilterEnv removes the environment variables passed into exlude
1808
func FilterEnv(envs []corev1.EnvVar, exclude ...string) []corev1.EnvVar {
1✔
1809
        var filteredEnv []corev1.EnvVar
1✔
1810

1✔
1811
        // build map of exclude strings for faster lookup
1✔
1812
        excludeMap := make(map[string]struct{})
1✔
1813
        for _, name := range exclude {
2✔
1814
                excludeMap[name] = struct{}{}
1✔
1815
        }
1✔
1816

1817
        // If the environment variable name is in the exclude list, skip it
1818
        // otherwise append it to the final list
1819
        for _, env := range envs {
2✔
1820
                if _, found := excludeMap[env.Name]; !found {
2✔
1821
                        filteredEnv = append(filteredEnv, env)
1✔
1822
                }
1✔
1823
        }
1824

1825
        return filteredEnv
1✔
1826
}
1827

1828
// PodMatchesPackage asserts that a given pod matches the given pod spec
1829
func podMatchesPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, pod corev1.Pod, skyhook *wrapper.Skyhook, stage v1alpha1.Stage) bool {
1✔
1830
        var expectedPod *corev1.Pod
1✔
1831

1✔
1832
        // need to differentiate whether the pod is for an interrupt or not so we know
1✔
1833
        // what to expect and how to compare them
1✔
1834
        isInterrupt := false
1✔
1835
        _, limitRange := pod.Annotations["kubernetes.io/limit-ranger"]
1✔
1836

1✔
1837
        if pod.Labels[fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX)] == "True" {
2✔
1838
                expectedPod = createInterruptPodForPackage(opts, &v1alpha1.Interrupt{}, "", _package, skyhook, "")
1✔
1839
                isInterrupt = true
1✔
1840
        } else {
2✔
1841
                expectedPod = createPodFromPackage(opts, _package, skyhook, "", stage)
1✔
1842
        }
1✔
1843

1844
        actualPod := pod.DeepCopy()
1✔
1845

1✔
1846
        // check to see whether the name or the version of the package changed
1✔
1847
        packageLabel := fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX)
1✔
1848
        if actualPod.Labels[packageLabel] != expectedPod.Labels[packageLabel] {
2✔
1849
                return false
1✔
1850
        }
1✔
1851

1852
        // compare initContainers since this is where a lot of the important info lives
1853
        for i := range actualPod.Spec.InitContainers {
2✔
1854
                expectedContainer := expectedPod.Spec.InitContainers[i]
1✔
1855
                actualContainer := actualPod.Spec.InitContainers[i]
1✔
1856

1✔
1857
                if expectedContainer.Name != actualContainer.Name {
2✔
1858
                        return false
1✔
1859
                }
1✔
1860

1861
                if expectedContainer.Image != actualContainer.Image {
1✔
1862
                        return false
×
1863
                }
×
1864

1865
                // compare the containers env vars except for the ones that are inserted
1866
                // by the operator by default as the SKYHOOK_RESOURCE_ID will change every
1867
                // time the skyhook is updated and would cause every pod to be removed
1868
                // TODO: This is ignoring all the static env vars that are set by operator config.
1869
                // It probably should be just SKYHOOK_RESOURCE_ID that is ignored. Otherwise,
1870
                // a user will have to manually delete the pod to update the package when operator is updated.
1871
                dummyAgentEnv := getAgentConfigEnvVars(opts, "", "", "", "")
1✔
1872
                excludedEnvs := make([]string, len(dummyAgentEnv))
1✔
1873
                for i, env := range dummyAgentEnv {
2✔
1874
                        excludedEnvs[i] = env.Name
1✔
1875
                }
1✔
1876
                expectedFilteredEnv := FilterEnv(expectedContainer.Env, excludedEnvs...)
1✔
1877
                actualFilteredEnv := FilterEnv(actualContainer.Env, excludedEnvs...)
1✔
1878
                if !reflect.DeepEqual(expectedFilteredEnv, actualFilteredEnv) {
2✔
1879
                        return false
1✔
1880
                }
1✔
1881

1882
                if !isInterrupt { // dont compare these since they are not configured on interrupt
2✔
1883
                        // compare resource requests and limits (CPU, memory, etc.)
1✔
1884
                        expectedResources := expectedContainer.Resources
1✔
1885
                        actualResources := actualContainer.Resources
1✔
1886
                        if skyhook.Spec.Packages[_package.Name].Resources != nil {
2✔
1887
                                // If CR has resources specified, they should match exactly
1✔
1888
                                if !reflect.DeepEqual(expectedResources, actualResources) {
2✔
1889
                                        return false
1✔
1890
                                }
1✔
1891
                        } else {
1✔
1892
                                // If CR has no resources specified, ensure pod has no resource overrides
1✔
1893
                                if !limitRange {
2✔
1894
                                        if actualResources.Requests != nil || actualResources.Limits != nil {
2✔
1895
                                                return false
1✔
1896
                                        }
1✔
1897
                                }
1898
                        }
1899
                }
1900
        }
1901

1902
        return true
1✔
1903
}
1904

1905
// ValidateRunningPackages deletes pods that don't match the current spec and checks if there are pods running
1906
// that don't match the node state and removes them if they exist
1907
func (r *SkyhookReconciler) ValidateRunningPackages(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
1✔
1908

1✔
1909
        update := false
1✔
1910
        errs := make([]error, 0)
1✔
1911
        // get all pods for this skyhook packages
1✔
1912
        pods, err := r.dal.GetPods(ctx,
1✔
1913
                client.MatchingLabels{
1✔
1914
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
1✔
1915
                },
1✔
1916
        )
1✔
1917
        if err != nil {
1✔
1918
                return false, fmt.Errorf("error getting pods while validating packages: %w", err)
×
1919
        }
×
1920
        if pods == nil || len(pods.Items) == 0 {
2✔
1921
                return false, nil // nothing running for this skyhook on this node
1✔
1922
        }
1✔
1923

1924
        // Initialize metrics for each stage
1925
        stages := make(map[string]map[string]map[v1alpha1.Stage]int)
1✔
1926

1✔
1927
        // group pods by node
1✔
1928
        podsbyNode := make(map[string][]corev1.Pod)
1✔
1929
        for _, pod := range pods.Items {
2✔
1930
                podsbyNode[pod.Spec.NodeName] = append(podsbyNode[pod.Spec.NodeName], pod)
1✔
1931
        }
1✔
1932

1933
        for _, node := range skyhook.GetNodes() {
2✔
1934
                nodeState, err := node.State()
1✔
1935
                if err != nil {
1✔
1936
                        return false, fmt.Errorf("error getting node state: %w", err)
×
1937
                }
×
1938

1939
                for _, pod := range podsbyNode[node.GetNode().Name] {
2✔
1940
                        found := false
1✔
1941

1✔
1942
                        runningPackage, err := GetPackage(&pod)
1✔
1943
                        if err != nil {
1✔
1944
                                errs = append(errs, fmt.Errorf("error getting package from pod [%s:%s] while validating packages: %w", pod.Namespace, pod.Name, err))
×
1945
                        }
×
1946

1947
                        // check if the package is part of the skyhook spec, if not we need to delete it
1948
                        for _, v := range skyhook.GetSkyhook().Spec.Packages {
2✔
1949
                                if podMatchesPackage(r.opts, &v, pod, skyhook.GetSkyhook(), runningPackage.Stage) {
2✔
1950
                                        found = true
1✔
1951
                                }
1✔
1952
                        }
1953

1954
                        // Increment the stage count for metrics
1955
                        if _, ok := stages[runningPackage.Name]; !ok {
2✔
1956
                                stages[runningPackage.Name] = make(map[string]map[v1alpha1.Stage]int)
1✔
1957
                                if _, ok := stages[runningPackage.Name][runningPackage.Version]; !ok {
2✔
1958
                                        stages[runningPackage.Name][runningPackage.Version] = make(map[v1alpha1.Stage]int)
1✔
1959
                                        for _, stage := range v1alpha1.Stages {
2✔
1960
                                                stages[runningPackage.Name][runningPackage.Version][stage] = 0
1✔
1961
                                        }
1✔
1962
                                }
1963
                        }
1964
                        stages[runningPackage.Name][runningPackage.Version][runningPackage.Stage]++
1✔
1965

1✔
1966
                        // uninstall is by definition not part of the skyhook spec, so we cant delete it (because it used to be but was removed, hence uninstalling it)
1✔
1967
                        if runningPackage.Stage == v1alpha1.StageUninstall {
2✔
1968
                                found = true
1✔
1969
                        }
1✔
1970

1971
                        if !found {
2✔
1972
                                update = true
1✔
1973

1✔
1974
                                err := r.InvalidPackage(ctx, &pod)
1✔
1975
                                if err != nil {
2✔
1976
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
1✔
1977
                                }
1✔
1978
                                continue
1✔
1979
                        }
1980

1981
                        // Check if package exists in node state, ie a package running that the node state doesn't know about
1982
                        // something that is often done to try to fix bad node state is to clear the node state completely
1983
                        // which if a package is running, we want to terminate it gracefully. Ofthen what leads to this is
1984
                        // the package is in a crashloop and the operator want to restart it the whole package.
1985
                        // when we apply a package it just check if there is a running package on the node for the state of the package
1986
                        // this can cause to leave a pod running in say config mode, and it there is a depends on you might not correctly
1987
                        // run thins in the correct order.
1988
                        deleteMe := false
1✔
1989
                        packageStatus, exists := nodeState[runningPackage.GetUniqueName()]
1✔
1990
                        if !exists { // package not in node state, so we need to delete it
2✔
1991
                                deleteMe = true
1✔
1992
                        } else { // package in node state, so we need to check if it's running
2✔
1993
                                // need check if the stats match, if not we need to delete it
1✔
1994
                                if packageStatus.Stage != runningPackage.Stage {
2✔
1995
                                        deleteMe = true
1✔
1996
                                }
1✔
1997
                        }
1998

1999
                        if deleteMe {
2✔
2000
                                update = true
1✔
2001
                                err := r.InvalidPackage(ctx, &pod)
1✔
2002
                                if err != nil {
2✔
2003
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
1✔
2004
                                }
1✔
2005
                        }
2006
                }
2007
        }
2008

2009
        return update, utilerrors.NewAggregate(errs)
1✔
2010
}
2011

2012
// InvalidPackage invalidates a package and updates the pod, which will trigger the pod to be deleted
2013
func (r *SkyhookReconciler) InvalidPackage(ctx context.Context, pod *corev1.Pod) error {
1✔
2014
        err := InvalidatePackage(pod)
1✔
2015
        if err != nil {
1✔
2016
                return fmt.Errorf("error invalidating package: %w", err)
×
2017
        }
×
2018

2019
        err = r.Update(ctx, pod)
1✔
2020
        if err != nil {
2✔
2021
                return fmt.Errorf("error updating pod: %w", err)
1✔
2022
        }
1✔
2023

2024
        return nil
1✔
2025
}
2026

2027
// ProcessInterrupt will check and do the interrupt if need, and returns
2028
// false means we are waiting
2029
// true means we are good to proceed
2030
func (r *SkyhookReconciler) ProcessInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, interrupt *v1alpha1.Interrupt, runInterrupt bool) (bool, error) {
1✔
2031

1✔
2032
        if !skyhookNode.HasInterrupt(*_package) {
2✔
2033
                return true, nil
1✔
2034
        }
1✔
2035

2036
        // default starting stage
2037
        stage := v1alpha1.StageApply
1✔
2038
        nextStage := skyhookNode.NextStage(_package)
1✔
2039
        if nextStage != nil {
2✔
2040
                stage = *nextStage
1✔
2041
        }
1✔
2042

2043
        // wait tell this is done if its happening
2044
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
1✔
2045
        if found && status.State == v1alpha1.StateSkipped {
2✔
2046
                return false, nil
1✔
2047
        }
1✔
2048

2049
        // Theres is a race condition when a node reboots and api cleans up the interrupt pod
2050
        // so we need to check if the pod exists and if it does, we need to recreate it
2051
        if status != nil && (status.State == v1alpha1.StateInProgress || status.State == v1alpha1.StateErroring) && status.Stage == v1alpha1.StageInterrupt {
2✔
2052
                // call interrupt to recreate the pod if missing
1✔
2053
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
1✔
2054
                if err != nil {
1✔
2055
                        return false, err
×
2056
                }
×
2057
        }
2058

2059
        // drain and cordon node before applying package that has an interrupt
2060
        if stage == v1alpha1.StageApply {
2✔
2061
                ready, err := r.EnsureNodeIsReadyForInterrupt(ctx, skyhookNode, _package)
1✔
2062
                if err != nil {
1✔
2063
                        return false, err
×
2064
                }
×
2065

2066
                if !ready {
2✔
2067
                        return false, nil
1✔
2068
                }
1✔
2069
        }
2070

2071
        // time to interrupt (once other packages have finished)
2072
        if stage == v1alpha1.StageInterrupt && runInterrupt {
2✔
2073
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
1✔
2074
                if err != nil {
1✔
2075
                        return false, err
×
2076
                }
×
2077

2078
                return false, nil
1✔
2079
        }
2080

2081
        //skipping
2082
        if stage == v1alpha1.StageInterrupt && !runInterrupt {
2✔
2083
                err := skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, stage, 0, _package.ContainerSHA)
1✔
2084
                if err != nil {
1✔
2085
                        return false, fmt.Errorf("error upserting to skip interrupt: %w", err)
×
2086
                }
×
2087
                return false, nil
1✔
2088
        }
2089

2090
        // wait tell this is done if its happening
2091
        if status != nil && status.Stage == v1alpha1.StageInterrupt && status.State != v1alpha1.StateComplete {
2✔
2092
                return false, nil
1✔
2093
        }
1✔
2094

2095
        return true, nil
1✔
2096
}
2097

2098
func (r *SkyhookReconciler) EnsureNodeIsReadyForInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
1✔
2099
        // cordon node
1✔
2100
        skyhookNode.Cordon()
1✔
2101

1✔
2102
        hasWork, err := r.HasNonInterruptWork(ctx, skyhookNode)
1✔
2103
        if err != nil {
1✔
2104
                return false, err
×
2105
        }
×
2106
        if hasWork { // keep waiting...
2✔
2107
                return false, nil
1✔
2108
        }
1✔
2109

2110
        ready, err := r.DrainNode(ctx, skyhookNode, _package)
1✔
2111
        if err != nil {
1✔
2112
                return false, fmt.Errorf("error draining node [%s]: %w", skyhookNode.GetNode().Name, err)
×
2113
        }
×
2114

2115
        return ready, nil
1✔
2116
}
2117

2118
// ApplyPackage starts a pod on node for the package
2119
func (r *SkyhookReconciler) ApplyPackage(ctx context.Context, logger logr.Logger, clusterState *clusterState, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, runInterrupt bool) error {
1✔
2120

1✔
2121
        if _package == nil {
1✔
2122
                return errors.New("can not apply nil package")
×
2123
        }
×
2124

2125
        // default starting stage
2126
        stage := v1alpha1.StageApply
1✔
2127

1✔
2128
        // These modes don't have anything that comes before them so we must specify them as the
1✔
2129
        // starting point. The next stage function will return nil until these modes complete.
1✔
2130
        // Config is a special case as sometimes apply will come before it and other times it wont
1✔
2131
        // which is why it needs to be here as well
1✔
2132
        if packageStatus, found := skyhookNode.PackageStatus(_package.GetUniqueName()); found {
2✔
2133
                switch packageStatus.Stage {
1✔
2134
                case v1alpha1.StageConfig, v1alpha1.StageUpgrade, v1alpha1.StageUninstall:
1✔
2135
                        stage = packageStatus.Stage
1✔
2136
                }
2137
        }
2138

2139
        // if stage != v1alpha1.StageApply {
2140
        //         // If a node gets rest by a user, the about method will return the wrong node state. Above sources it from the skyhook status.
2141
        //         // check if the node has nothing, reset it then apply the package.
2142
        //         nodeState, err := skyhookNode.State()
2143
        //         if err != nil {
2144
        //                 return fmt.Errorf("error getting node state: %w", err)
2145
        //         }
2146

2147
        //         _, found := nodeState[_package.GetUniqueName()]
2148
        //         if !found {
2149
        //                 stage = v1alpha1.StageApply
2150
        //         }
2151
        // }
2152

2153
        nextStage := skyhookNode.NextStage(_package)
1✔
2154
        if nextStage != nil {
2✔
2155
                stage = *nextStage
1✔
2156
        }
1✔
2157

2158
        // test if pod exists, if so, bailout
2159
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
1✔
2160
        if err != nil {
1✔
2161
                return err
×
2162
        }
×
2163

2164
        // wait tell this is done if its happening
2165
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
1✔
2166

1✔
2167
        if found && status.State == v1alpha1.StateSkipped { // skipped, so nothing to do
1✔
2168
                return nil
×
2169
        }
×
2170

2171
        if found && status.State == v1alpha1.StateInProgress { // running, so do nothing atm
2✔
2172
                if exists {
2✔
2173
                        return nil
1✔
2174
                }
1✔
2175
        }
2176

2177
        if exists {
2✔
2178
                // nothing to do here, already running
1✔
2179
                return nil
1✔
2180
        }
1✔
2181

2182
        pod := createPodFromPackage(r.opts, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name, stage)
1✔
2183

1✔
2184
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, stage, _package); err != nil {
1✔
2185
                return fmt.Errorf("error setting package on pod: %w", err)
×
2186
        }
×
2187

2188
        // setup ownership of the pod we created
2189
        // helps run time know what to do when something happens to this pod we are about to create
2190
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
1✔
2191
                return fmt.Errorf("error setting ownership: %w", err)
×
2192
        }
×
2193

2194
        if err := r.Create(ctx, pod); err != nil {
1✔
2195
                return fmt.Errorf("error creating pod: %w", err)
×
2196
        }
×
2197

2198
        if err = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, stage, 0, _package.ContainerSHA); err != nil {
1✔
2199
                err = fmt.Errorf("error upserting package: %w", err) // want to keep going in this case, but don't want to lose the err
×
2200
        }
×
2201

2202
        skyhookNode.SetStatus(v1alpha1.StatusInProgress)
1✔
2203

1✔
2204
        skyhookNode.GetSkyhook().AddCondition(metav1.Condition{
1✔
2205
                Type:               fmt.Sprintf("%s/ApplyPackage", v1alpha1.METADATA_PREFIX),
1✔
2206
                Status:             metav1.ConditionTrue,
1✔
2207
                ObservedGeneration: skyhookNode.GetSkyhook().Generation,
1✔
2208
                LastTransitionTime: metav1.Now(),
1✔
2209
                Reason:             "ApplyPackage",
1✔
2210
                Message:            fmt.Sprintf("Applying package [%s:%s] to node [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name),
1✔
2211
        })
1✔
2212

1✔
2213
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] from [skyhook:%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetSkyhook().Name, stage)
1✔
2214
        r.recorder.Eventf(skyhookNode.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] to node [%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name, stage)
1✔
2215

1✔
2216
        skyhookNode.GetSkyhook().Updated = true
1✔
2217

1✔
2218
        return err
1✔
2219
}
2220

2221
// HandleRuntimeRequired finds any nodes for which all runtime required Skyhooks are complete and remove their runtime required taint
2222
// Will return an error if the patching of the nodes is not possible
2223
func (r *SkyhookReconciler) HandleRuntimeRequired(ctx context.Context, clusterState *clusterState) error {
1✔
2224
        node_to_skyhooks, skyhook_node_map := groupSkyhooksByNode(clusterState)
1✔
2225
        to_remove := getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks, skyhook_node_map)
1✔
2226
        // Remove the runtime required taint from nodes in to_remove
1✔
2227
        taint_to_remove := r.opts.GetRuntimeRequiredTaint()
1✔
2228
        errs := make([]error, 0)
1✔
2229
        for _, node := range to_remove {
2✔
2230
                // check before removing taint that it even exists to begin with
1✔
2231
                if !taints.TaintExists(node.Spec.Taints, &taint_to_remove) {
2✔
2232
                        continue
1✔
2233
                }
2234
                // RemoveTaint will ALWAYS return nil for its error so no need to check it
2235
                new_node, updated, _ := taints.RemoveTaint(node, &taint_to_remove)
1✔
2236
                if updated {
2✔
2237
                        err := r.Patch(ctx, new_node, client.MergeFrom(node))
1✔
2238
                        if err != nil {
1✔
2239
                                errs = append(errs, err)
×
2240
                        }
×
2241
                }
2242
        }
2243
        if len(errs) > 0 {
1✔
2244
                return utilerrors.NewAggregate(errs)
×
2245
        }
×
2246
        return nil
1✔
2247
}
2248

2249
// Group Skyhooks by what node they target
2250
func groupSkyhooksByNode(clusterState *clusterState) (map[types.UID][]SkyhookNodes, map[types.UID]*corev1.Node) {
1✔
2251
        node_to_skyhooks := make(map[types.UID][]SkyhookNodes)
1✔
2252
        nodes := make(map[types.UID]*corev1.Node)
1✔
2253
        for _, skyhook := range clusterState.skyhooks {
2✔
2254
                // Ignore skyhooks that don't have runtime required
1✔
2255
                if !skyhook.GetSkyhook().Spec.RuntimeRequired {
2✔
2256
                        continue
1✔
2257
                }
2258
                for _, node := range skyhook.GetNodes() {
2✔
2259
                        if _, ok := node_to_skyhooks[node.GetNode().UID]; !ok {
2✔
2260
                                node_to_skyhooks[node.GetNode().UID] = make([]SkyhookNodes, 0)
1✔
2261
                                nodes[node.GetNode().UID] = node.GetNode()
1✔
2262
                        }
1✔
2263
                        node_to_skyhooks[node.GetNode().UID] = append(node_to_skyhooks[node.GetNode().UID], skyhook)
1✔
2264
                }
2265

2266
        }
2267
        return node_to_skyhooks, nodes
1✔
2268
}
2269

2270
// Get the nodes to remove runtime required taint from node that all skyhooks targeting that node have completed
2271
func getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks map[types.UID][]SkyhookNodes, nodes map[types.UID]*corev1.Node) []*corev1.Node {
1✔
2272
        to_remove := make([]*corev1.Node, 0)
1✔
2273
        for node_uid, skyhooks := range node_to_skyhooks {
2✔
2274
                all_complete := true
1✔
2275
                for _, skyhook := range skyhooks {
2✔
2276
                        if !skyhook.IsComplete() {
2✔
2277
                                all_complete = false
1✔
2278
                                break
1✔
2279
                        }
2280
                }
2281
                if all_complete {
2✔
2282
                        to_remove = append(to_remove, nodes[node_uid])
1✔
2283
                }
1✔
2284
        }
2285
        return to_remove
1✔
2286
}
2287

2288
// setPodResources sets resources for all containers and init containers in the pod if override is set, else leaves empty for LimitRange
2289
func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) {
1✔
2290
        if res == nil {
2✔
2291
                return
1✔
2292
        }
1✔
2293
        if !res.CPURequest.IsZero() || !res.CPULimit.IsZero() || !res.MemoryRequest.IsZero() || !res.MemoryLimit.IsZero() {
2✔
2294
                for i := range pod.Spec.InitContainers {
2✔
2295
                        pod.Spec.InitContainers[i].Resources = corev1.ResourceRequirements{
1✔
2296
                                Limits: corev1.ResourceList{
1✔
2297
                                        corev1.ResourceCPU:    res.CPULimit,
1✔
2298
                                        corev1.ResourceMemory: res.MemoryLimit,
1✔
2299
                                },
1✔
2300
                                Requests: corev1.ResourceList{
1✔
2301
                                        corev1.ResourceCPU:    res.CPURequest,
1✔
2302
                                        corev1.ResourceMemory: res.MemoryRequest,
1✔
2303
                                },
1✔
2304
                        }
1✔
2305
                }
1✔
2306
        }
2307
}
2308

2309
// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies.
2310
func partitionNodesIntoCompartments(clusterState *clusterState) error {
1✔
2311
        for _, skyhook := range clusterState.skyhooks {
2✔
2312
                // Skip skyhooks without a deployment policy (they use the default compartment created in BuildState)
1✔
2313
                if skyhook.GetSkyhook().Spec.DeploymentPolicy == "" {
2✔
2314
                        continue
1✔
2315
                }
2316

2317
                // Clear all compartments before reassigning nodes to prevent stale nodes
2318
                // This ensures nodes are only in their current compartment based on current labels
2319
                for _, compartment := range skyhook.GetCompartments() {
2✔
2320
                        compartment.ClearNodes()
1✔
2321
                }
1✔
2322

2323
                for _, node := range skyhook.GetNodes() {
2✔
2324
                        compartmentName, err := skyhook.AssignNodeToCompartment(node)
1✔
2325
                        if err != nil {
1✔
2326
                                return fmt.Errorf("error assigning node %s: %w", node.GetNode().Name, err)
×
2327
                        }
×
2328
                        skyhook.AddCompartmentNode(compartmentName, node)
1✔
2329
                }
2330
        }
2331

2332
        return nil
1✔
2333
}
2334

2335
// validateAndUpsertSkyhookData performs validation and configmap operations for a skyhook
2336
func (r *SkyhookReconciler) validateAndUpsertSkyhookData(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, ctrl.Result, error) {
1✔
2337
        if yes, result, err := shouldReturn(r.ValidateRunningPackages(ctx, skyhook)); yes {
2✔
2338
                return yes, result, err
1✔
2339
        }
1✔
2340

2341
        if yes, result, err := shouldReturn(r.ValidateNodeConfigmaps(ctx, skyhook.GetSkyhook().Name, skyhook.GetNodes())); yes {
2✔
2342
                return yes, result, err
1✔
2343
        }
1✔
2344

2345
        if yes, result, err := shouldReturn(r.UpsertConfigmaps(ctx, skyhook, clusterState)); yes {
2✔
2346
                return yes, result, err
1✔
2347
        }
1✔
2348

2349
        return false, ctrl.Result{}, nil
1✔
2350
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc