• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NVIDIA / skyhook / 20494412809

24 Dec 2025 09:50PM UTC coverage: 81.314% (+6.0%) from 75.355%
20494412809

push

github

lockwobr
feat: added new tests to cover the new webhook setup

6384 of 7851 relevant lines covered (81.31%)

4.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.58
/operator/internal/controller/skyhook_controller.go
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
 * SPDX-License-Identifier: Apache-2.0
4
 *
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
package controller
20

21
import (
22
        "cmp"
23
        "context"
24
        "crypto/sha256"
25
        "encoding/hex"
26
        "encoding/json"
27
        "errors"
28
        "fmt"
29
        "reflect"
30
        "slices"
31
        "sort"
32
        "strings"
33
        "time"
34

35
        "github.com/NVIDIA/skyhook/operator/api/v1alpha1"
36
        "github.com/NVIDIA/skyhook/operator/internal/dal"
37
        "github.com/NVIDIA/skyhook/operator/internal/version"
38
        "github.com/NVIDIA/skyhook/operator/internal/wrapper"
39
        "github.com/go-logr/logr"
40

41
        corev1 "k8s.io/api/core/v1"
42
        policyv1 "k8s.io/api/policy/v1"
43
        apierrors "k8s.io/apimachinery/pkg/api/errors"
44
        "k8s.io/apimachinery/pkg/api/resource"
45
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46
        "k8s.io/apimachinery/pkg/runtime"
47
        "k8s.io/apimachinery/pkg/types"
48
        utilerrors "k8s.io/apimachinery/pkg/util/errors"
49
        "k8s.io/client-go/tools/record"
50
        "k8s.io/kubernetes/pkg/util/taints"
51
        ctrl "sigs.k8s.io/controller-runtime"
52
        "sigs.k8s.io/controller-runtime/pkg/client"
53
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/log"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
)
58

59
const (
60
        EventsReasonSkyhookApply       = "Apply"
61
        EventsReasonSkyhookInterrupt   = "Interrupt"
62
        EventsReasonSkyhookDrain       = "Drain"
63
        EventsReasonSkyhookStateChange = "State"
64
        EventsReasonNodeReboot         = "Reboot"
65
        EventTypeNormal                = "Normal"
66
        // EventTypeWarning = "Warning"
67
        TaintUnschedulable     = corev1.TaintNodeUnschedulable
68
        InterruptContainerName = "interrupt"
69

70
        SkyhookFinalizer = "skyhook.nvidia.com/skyhook"
71
)
72

73
type SkyhookOperatorOptions struct {
74
        Namespace            string        `env:"NAMESPACE, default=skyhook"`
75
        MaxInterval          time.Duration `env:"DEFAULT_INTERVAL, default=10m"`
76
        ImagePullSecret      string        `env:"IMAGE_PULL_SECRET, default=node-init-secret"` //TODO: should this be defaulted?
77
        CopyDirRoot          string        `env:"COPY_DIR_ROOT, default=/var/lib/skyhook"`
78
        ReapplyOnReboot      bool          `env:"REAPPLY_ON_REBOOT, default=false"`
79
        RuntimeRequiredTaint string        `env:"RUNTIME_REQUIRED_TAINT, default=skyhook.nvidia.com=runtime-required:NoSchedule"`
80
        PauseImage           string        `env:"PAUSE_IMAGE, default=registry.k8s.io/pause:3.10"`
81
        AgentImage           string        `env:"AGENT_IMAGE, default=ghcr.io/nvidia/skyhook/agent:latest"` // TODO: this needs to be updated with a working default
82
        AgentLogRoot         string        `env:"AGENT_LOG_ROOT, default=/var/log/skyhook"`
83
}
84

85
func (o *SkyhookOperatorOptions) Validate() error {
8✔
86

8✔
87
        messages := make([]string, 0)
8✔
88
        if o.Namespace == "" {
8✔
89
                messages = append(messages, "namespace must be set")
×
90
        }
×
91
        if o.CopyDirRoot == "" {
8✔
92
                messages = append(messages, "copy dir root must be set")
×
93
        }
×
94
        if o.RuntimeRequiredTaint == "" {
8✔
95
                messages = append(messages, "runtime required taint must be set")
×
96
        }
×
97
        if o.MaxInterval < time.Minute {
9✔
98
                messages = append(messages, "max interval must be at least 1 minute")
1✔
99
        }
1✔
100

101
        // CopyDirRoot must start with /
102
        if !strings.HasPrefix(o.CopyDirRoot, "/") {
9✔
103
                messages = append(messages, "copy dir root must start with /")
1✔
104
        }
1✔
105

106
        // RuntimeRequiredTaint must be parsable and must not be a deletion
107
        _, delete, err := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
8✔
108
        if err != nil {
9✔
109
                messages = append(messages, fmt.Sprintf("runtime required taint is invalid: %s", err.Error()))
1✔
110
        }
1✔
111
        if len(delete) > 0 {
9✔
112
                messages = append(messages, "runtime required taint must not be a deletion")
1✔
113
        }
1✔
114

115
        if o.AgentImage == "" {
9✔
116
                messages = append(messages, "agent image must be set")
1✔
117
        }
1✔
118

119
        if !strings.Contains(o.AgentImage, ":") {
9✔
120
                messages = append(messages, "agent image must contain a tag")
1✔
121
        }
1✔
122

123
        if o.PauseImage == "" {
9✔
124
                messages = append(messages, "pause image must be set")
1✔
125
        }
1✔
126

127
        if !strings.Contains(o.PauseImage, ":") {
9✔
128
                messages = append(messages, "pause image must contain a tag")
1✔
129
        }
1✔
130

131
        if len(messages) > 0 {
9✔
132
                return errors.New(strings.Join(messages, ", "))
1✔
133
        }
1✔
134

135
        return nil
8✔
136
}
137

138
// AgentVersion returns the image tag portion of AgentImage
139
func (o *SkyhookOperatorOptions) AgentVersion() string {
8✔
140
        parts := strings.Split(o.AgentImage, ":")
8✔
141
        return parts[len(parts)-1]
8✔
142
}
8✔
143

144
func (o *SkyhookOperatorOptions) GetRuntimeRequiredTaint() corev1.Taint {
8✔
145
        to_add, _, _ := taints.ParseTaints([]string{o.RuntimeRequiredTaint})
8✔
146
        return to_add[0]
8✔
147
}
8✔
148

149
func (o *SkyhookOperatorOptions) GetRuntimeRequiredToleration() corev1.Toleration {
8✔
150
        taint := o.GetRuntimeRequiredTaint()
8✔
151
        return corev1.Toleration{
8✔
152
                Key:      taint.Key,
8✔
153
                Operator: corev1.TolerationOpEqual,
8✔
154
                Value:    taint.Value,
8✔
155
                Effect:   taint.Effect,
8✔
156
        }
8✔
157
}
8✔
158

159
// force type checking against this interface
160
var _ reconcile.Reconciler = &SkyhookReconciler{}
161

162
func NewSkyhookReconciler(schema *runtime.Scheme, c client.Client, recorder record.EventRecorder, opts SkyhookOperatorOptions) (*SkyhookReconciler, error) {
8✔
163

8✔
164
        err := opts.Validate()
8✔
165
        if err != nil {
8✔
166
                return nil, fmt.Errorf("invalid skyhook operator options: %w", err)
×
167
        }
×
168

169
        return &SkyhookReconciler{
8✔
170
                Client:   c,
8✔
171
                scheme:   schema,
8✔
172
                recorder: recorder,
8✔
173
                opts:     opts,
8✔
174
                dal:      dal.New(c),
8✔
175
        }, nil
8✔
176
}
177

178
// SkyhookReconciler reconciles a Skyhook object
179
type SkyhookReconciler struct {
180
        client.Client
181
        scheme   *runtime.Scheme
182
        recorder record.EventRecorder
183
        opts     SkyhookOperatorOptions
184
        dal      dal.DAL
185
}
186

187
// SetupWithManager sets up the controller with the Manager.
188
func (r *SkyhookReconciler) SetupWithManager(mgr ctrl.Manager) error {
8✔
189

8✔
190
        // indexes allow for query on fields to use the local cache
8✔
191
        indexer := mgr.GetFieldIndexer()
8✔
192
        err := indexer.
8✔
193
                IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
15✔
194
                        pod, ok := o.(*corev1.Pod)
7✔
195
                        if !ok {
7✔
196
                                return nil
×
197
                        }
×
198
                        return []string{pod.Spec.NodeName}
7✔
199
                })
200

201
        if err != nil {
8✔
202
                return err
×
203
        }
×
204

205
        ehandler := &eventHandler{
8✔
206
                logger: mgr.GetLogger(),
8✔
207
                dal:    dal.New(r.Client),
8✔
208
        }
8✔
209

8✔
210
        return ctrl.NewControllerManagedBy(mgr).
8✔
211
                For(&v1alpha1.Skyhook{}).
8✔
212
                Watches(
8✔
213
                        &corev1.Pod{},
8✔
214
                        handler.EnqueueRequestsFromMapFunc(podHandlerFunc),
8✔
215
                ).
8✔
216
                Watches(
8✔
217
                        &corev1.Node{},
8✔
218
                        ehandler,
8✔
219
                ).
8✔
220
                Complete(r)
8✔
221
}
222

223
// CRD Permissions
224
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks,verbs=get;list;watch;create;update;patch;delete
225
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/status,verbs=get;update;patch
226
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=skyhooks/finalizers,verbs=update
227
//+kubebuilder:rbac:groups=skyhook.nvidia.com,resources=deploymentpolicies,verbs=get;list;watch
228

229
// core permissions
230
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;update;patch;watch
231
//+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
232
//+kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create
233
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
234
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
235

236
// Reconcile is part of the main kubernetes reconciliation loop which aims to
237
// move the current state of the cluster closer to the desired state.
238
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile
239
func (r *SkyhookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
8✔
240
        logger := log.FromContext(ctx)
8✔
241
        // split off requests for pods
8✔
242
        if strings.HasPrefix(req.Name, "pod---") {
15✔
243
                name := strings.Split(req.Name, "pod---")[1]
7✔
244
                pod, err := r.dal.GetPod(ctx, req.Namespace, name)
7✔
245
                if err == nil && pod != nil { // if pod, then call other wise not a pod
14✔
246
                        return r.PodReconcile(ctx, pod)
7✔
247
                }
7✔
248
                return ctrl.Result{}, err
7✔
249
        }
250

251
        // get all skyhooks (SCR)
252
        skyhooks, err := r.dal.GetSkyhooks(ctx)
8✔
253
        if err != nil {
8✔
254
                // error, going to requeue and backoff
×
255
                logger.Error(err, "error getting skyhooks")
×
256
                return ctrl.Result{}, err
×
257
        }
×
258

259
        // if there are no skyhooks, so actually nothing to do, so don't requeue
260
        if skyhooks == nil || len(skyhooks.Items) == 0 {
16✔
261
                return ctrl.Result{}, nil
8✔
262
        }
8✔
263

264
        // get all nodes
265
        nodes, err := r.dal.GetNodes(ctx)
7✔
266
        if err != nil {
7✔
267
                // error, going to requeue and backoff
×
268
                logger.Error(err, "error getting nodes")
×
269
                return ctrl.Result{}, err
×
270
        }
×
271

272
        // if no nodes, well not work to do either
273
        if nodes == nil || len(nodes.Items) == 0 {
7✔
274
                // no nodes, so nothing to do
×
275
                return ctrl.Result{}, nil
×
276
        }
×
277

278
        // get all deployment policies
279
        deploymentPolicies, err := r.dal.GetDeploymentPolicies(ctx)
7✔
280
        if err != nil {
7✔
281
                logger.Error(err, "error getting deployment policies")
×
282
                return ctrl.Result{}, err
×
283
        }
×
284

285
        // TODO: this build state could error in a lot of ways, and I think we might want to move towards partial state
286
        // mean if we cant get on SCR state, great, process that one and error
287

288
        // BUILD cluster state from all skyhooks, and all nodes
289
        // this filters and pairs up nodes to skyhooks, also provides help methods for introspection and mutation
290
        clusterState, err := BuildState(skyhooks, nodes, deploymentPolicies)
7✔
291
        if err != nil {
7✔
292
                // error, going to requeue and backoff
×
293
                logger.Error(err, "error building cluster state")
×
294
                return ctrl.Result{}, err
×
295
        }
×
296

297
        if yes, result, err := shouldReturn(r.HandleMigrations(ctx, clusterState)); yes {
14✔
298
                return result, err
7✔
299
        }
7✔
300

301
        if yes, result, err := shouldReturn(r.TrackReboots(ctx, clusterState)); yes {
14✔
302
                return result, err
7✔
303
        }
7✔
304

305
        // node picker is for selecting nodes to do work, tries maintain a prior of nodes between SCRs
306
        nodePicker := NewNodePicker(logger, r.opts.GetRuntimeRequiredToleration())
7✔
307

7✔
308
        errs := make([]error, 0)
7✔
309
        var result *ctrl.Result
7✔
310

7✔
311
        for _, skyhook := range clusterState.skyhooks {
14✔
312
                if yes, result, err := shouldReturn(r.HandleFinalizer(ctx, skyhook)); yes {
14✔
313
                        return result, err
7✔
314
                }
7✔
315

316
                if yes, result, err := shouldReturn(r.ReportState(ctx, clusterState, skyhook)); yes {
14✔
317
                        return result, err
7✔
318
                }
7✔
319

320
                if skyhook.IsPaused() {
13✔
321
                        if yes, result, err := shouldReturn(r.UpdatePauseStatus(ctx, clusterState, skyhook)); yes {
12✔
322
                                return result, err
6✔
323
                        }
6✔
324
                        continue
6✔
325
                }
326

327
                if yes, result, err := r.validateAndUpsertSkyhookData(ctx, skyhook, clusterState); yes {
14✔
328
                        return result, err
7✔
329
                }
7✔
330

331
                changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
7✔
332
                if changed {
14✔
333
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
334
                        if len(errs) > 0 {
12✔
335
                                return ctrl.Result{RequeueAfter: time.Second * 2}, utilerrors.NewAggregate(errs)
5✔
336
                        }
5✔
337
                        return ctrl.Result{RequeueAfter: time.Second * 2}, nil
7✔
338
                }
339

340
                _, err := HandleVersionChange(skyhook)
7✔
341
                if err != nil {
7✔
342
                        return ctrl.Result{RequeueAfter: time.Second * 2}, fmt.Errorf("error getting packages to uninstall: %w", err)
×
343
                }
×
344
        }
345

346
        skyhook := GetNextSkyhook(clusterState.skyhooks)
7✔
347
        if skyhook != nil && !skyhook.IsPaused() {
14✔
348

7✔
349
                result, err = r.RunSkyhookPackages(ctx, clusterState, nodePicker, skyhook)
7✔
350
                if err != nil {
12✔
351
                        logger.Error(err, "error processing skyhook", "skyhook", skyhook.GetSkyhook().Name)
5✔
352
                        errs = append(errs, err)
5✔
353
                }
5✔
354
        }
355

356
        err = r.HandleRuntimeRequired(ctx, clusterState)
7✔
357
        if err != nil {
7✔
358
                errs = append(errs, err)
×
359
        }
×
360

361
        if len(errs) > 0 {
12✔
362
                err := utilerrors.NewAggregate(errs)
5✔
363
                return ctrl.Result{}, err
5✔
364
        }
5✔
365

366
        if result != nil {
14✔
367
                return *result, nil
7✔
368
        }
7✔
369

370
        // default happy retry after max
371
        return ctrl.Result{RequeueAfter: r.opts.MaxInterval}, nil
7✔
372
}
373

374
func shouldReturn(updates bool, err error) (bool, ctrl.Result, error) {
7✔
375
        if err != nil {
14✔
376
                return true, ctrl.Result{}, err
7✔
377
        }
7✔
378
        if updates {
14✔
379
                return true, ctrl.Result{RequeueAfter: time.Second * 2}, nil
7✔
380
        }
7✔
381
        return false, ctrl.Result{}, nil
7✔
382
}
383

384
func (r *SkyhookReconciler) HandleMigrations(ctx context.Context, clusterState *clusterState) (bool, error) {
7✔
385

7✔
386
        updates := false
7✔
387

7✔
388
        if version.VERSION == "" {
7✔
389
                // this means the binary was complied without version information
×
390
                return false, nil
×
391
        }
×
392

393
        logger := log.FromContext(ctx)
7✔
394
        errors := make([]error, 0)
7✔
395
        for _, skyhook := range clusterState.skyhooks {
14✔
396

7✔
397
                err := skyhook.Migrate(logger)
7✔
398
                if err != nil {
7✔
399
                        return false, fmt.Errorf("error migrating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
400
                }
×
401

402
                if err := skyhook.GetSkyhook().Skyhook.Validate(); err != nil {
7✔
403
                        return false, fmt.Errorf("error validating skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
404
                }
×
405

406
                for _, node := range skyhook.GetNodes() {
14✔
407
                        if node.Changed() {
14✔
408
                                err := r.Status().Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
7✔
409
                                if err != nil {
7✔
410
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
411
                                }
×
412

413
                                err = r.Patch(ctx, node.GetNode(), client.MergeFrom(clusterState.tracker.GetOriginal(node.GetNode())))
7✔
414
                                if err != nil {
7✔
415
                                        errors = append(errors, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
416
                                }
×
417
                                updates = true
7✔
418
                        }
419
                }
420

421
                if skyhook.GetSkyhook().Updated {
14✔
422
                        // need to do this because SaveNodesAndSkyhook only saves skyhook status, not the main skyhook object where the annotations are
7✔
423
                        // additionally it needs to be an update, a patch nils out the annotations for some reason, which the save function does a patch
7✔
424

7✔
425
                        if err = r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
14✔
426
                                return false, fmt.Errorf("error updating during migration skyhook status [%s]: %w", skyhook.GetSkyhook().Name, err)
7✔
427
                        }
7✔
428

429
                        // because of conflict issues (409) we need to do things a bit differently here.
430
                        // We might be able to use server side apply in the future, but for now we need to do this
431
                        // https://kubernetes.io/docs/reference/using-api/server-side-apply/
432
                        // https://github.com/kubernetes-sigs/controller-runtime/issues/347
433

434
                        // work around for now is to grab a new copy of the object, and then patch it
435

436
                        newskyhook, err := r.dal.GetSkyhook(ctx, skyhook.GetSkyhook().Name)
7✔
437
                        if err != nil {
7✔
438
                                return false, fmt.Errorf("error getting skyhook to migrate [%s]: %w", skyhook.GetSkyhook().Name, err)
×
439
                        }
×
440
                        newPatch := client.MergeFrom(newskyhook.DeepCopy())
7✔
441

7✔
442
                        // set version
7✔
443
                        wrapper.NewSkyhookWrapper(newskyhook).SetVersion()
7✔
444

7✔
445
                        if err = r.Patch(ctx, newskyhook, newPatch); err != nil {
7✔
446
                                return false, fmt.Errorf("error updating during migration skyhook [%s]: %w", skyhook.GetSkyhook().Name, err)
×
447
                        }
×
448

449
                        updates = true
7✔
450
                }
451
        }
452

453
        if len(errors) > 0 {
7✔
454
                return false, utilerrors.NewAggregate(errors)
×
455
        }
×
456

457
        return updates, nil
7✔
458
}
459

460
// ReportState computes and puts important information into the skyhook status so that monitoring tools such as k9s
461
// can see the information at a glance. For example, the number of completed nodes and the list of packages in the skyhook.
462
func (r *SkyhookReconciler) ReportState(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
7✔
463

7✔
464
        // save updated state to skyhook status
7✔
465
        skyhook.ReportState()
7✔
466

7✔
467
        if skyhook.GetSkyhook().Updated {
14✔
468
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
469
                if len(errs) > 0 {
7✔
470
                        return false, utilerrors.NewAggregate(errs)
×
471
                }
×
472
                return true, nil
7✔
473
        }
474

475
        return false, nil
7✔
476
}
477

478
func (r *SkyhookReconciler) UpdatePauseStatus(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, error) {
6✔
479
        changed := UpdateSkyhookPauseStatus(skyhook)
6✔
480

6✔
481
        if changed {
12✔
482
                _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
6✔
483
                if len(errs) > 0 {
7✔
484
                        return false, utilerrors.NewAggregate(errs)
1✔
485
                }
1✔
486
                return true, nil
6✔
487
        }
488

489
        return false, nil
6✔
490
}
491

492
func (r *SkyhookReconciler) TrackReboots(ctx context.Context, clusterState *clusterState) (bool, error) {
7✔
493

7✔
494
        updates := false
7✔
495
        errs := make([]error, 0)
7✔
496

7✔
497
        for _, skyhook := range clusterState.skyhooks {
14✔
498
                if skyhook.GetSkyhook().Status.NodeBootIds == nil {
14✔
499
                        skyhook.GetSkyhook().Status.NodeBootIds = make(map[string]string)
7✔
500
                }
7✔
501

502
                for _, node := range skyhook.GetNodes() {
14✔
503
                        id, ok := skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name]
7✔
504

7✔
505
                        if !ok { // new node
14✔
506
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
7✔
507
                                skyhook.GetSkyhook().Updated = true
7✔
508
                        }
7✔
509

510
                        if id != "" && id != node.GetNode().Status.NodeInfo.BootID { // node rebooted
7✔
511
                                if r.opts.ReapplyOnReboot {
×
512
                                        r.recorder.Eventf(skyhook.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node [%s] to be reapplied", node.GetNode().Name)
×
513
                                        r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonNodeReboot, "detected reboot, resetting node for [%s] to be reapplied", node.GetSkyhook().Name)
×
514
                                        node.Reset()
×
515
                                }
×
516
                                skyhook.GetSkyhook().Status.NodeBootIds[node.GetNode().Name] = node.GetNode().Status.NodeInfo.BootID
×
517
                                skyhook.GetSkyhook().Updated = true
×
518
                        }
519

520
                        if node.Changed() { // update
7✔
521
                                updates = true
×
522
                                err := r.Update(ctx, node.GetNode())
×
523
                                if err != nil {
×
524
                                        errs = append(errs, fmt.Errorf("error updating node after reboot [%s]: %w", node.GetNode().Name, err))
×
525
                                }
×
526
                        }
527
                }
528
                if skyhook.GetSkyhook().Updated { // update
14✔
529
                        updates = true
7✔
530
                        err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook)
7✔
531
                        if err != nil {
14✔
532
                                errs = append(errs, fmt.Errorf("error updating skyhook status after reboot [%s]: %w", skyhook.GetSkyhook().Name, err))
7✔
533
                        }
7✔
534
                }
535
        }
536

537
        return updates, utilerrors.NewAggregate(errs)
7✔
538
}
539

540
// RunSkyhookPackages runs all skyhook packages then saves and requeues if changes were made
541
func (r *SkyhookReconciler) RunSkyhookPackages(ctx context.Context, clusterState *clusterState, nodePicker *NodePicker, skyhook SkyhookNodes) (*ctrl.Result, error) {
7✔
542

7✔
543
        logger := log.FromContext(ctx)
7✔
544
        requeue := false
7✔
545

7✔
546
        toUninstall, err := HandleVersionChange(skyhook)
7✔
547
        if err != nil {
7✔
548
                return nil, fmt.Errorf("error getting packages to uninstall: %w", err)
×
549
        }
×
550

551
        changed := IntrospectSkyhook(skyhook, clusterState.skyhooks)
7✔
552
        if !changed && skyhook.IsComplete() {
7✔
553
                return nil, nil
×
554
        }
×
555

556
        selectedNode := nodePicker.SelectNodes(skyhook)
7✔
557

7✔
558
        for _, node := range selectedNode {
14✔
559

7✔
560
                if node.IsComplete() && !node.Changed() {
7✔
561
                        continue
×
562
                }
563

564
                toRun, err := node.RunNext()
7✔
565
                if err != nil {
7✔
566
                        return nil, fmt.Errorf("error getting next packages to run: %w", err)
×
567
                }
×
568

569
                // prepend the uninstall packages so they are ran first
570
                toRun = append(toUninstall, toRun...)
7✔
571

7✔
572
                interrupt, pack := fudgeInterruptWithPriority(toRun, skyhook.GetSkyhook().GetConfigUpdates(), skyhook.GetSkyhook().GetConfigInterrupts())
7✔
573

7✔
574
                for _, f := range toRun {
14✔
575

7✔
576
                        ok, err := r.ProcessInterrupt(ctx, node, f, interrupt, interrupt != nil && f.Name == pack)
7✔
577
                        if err != nil {
7✔
578
                                // TODO: error handle
×
579
                                return nil, fmt.Errorf("error processing if we should interrupt [%s:%s]: %w", f.Name, f.Version, err)
×
580
                        }
×
581
                        if !ok {
12✔
582
                                requeue = true
5✔
583
                                continue
5✔
584
                        }
585

586
                        err = r.ApplyPackage(ctx, logger, clusterState, node, f, interrupt != nil && f.Name == pack)
7✔
587
                        if err != nil {
7✔
588
                                return nil, fmt.Errorf("error applying package [%s:%s]: %w", f.Name, f.Version, err)
×
589
                        }
×
590

591
                        // process one package at a time
592
                        if skyhook.GetSkyhook().Spec.Serial {
7✔
593
                                return &ctrl.Result{Requeue: true}, nil
×
594
                        }
×
595
                }
596
        }
597

598
        saved, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
7✔
599
        if len(errs) > 0 {
12✔
600
                return &ctrl.Result{}, utilerrors.NewAggregate(errs)
5✔
601
        }
5✔
602
        if saved {
14✔
603
                requeue = true
7✔
604
        }
7✔
605

606
        if !skyhook.IsComplete() || requeue {
14✔
607
                return &ctrl.Result{RequeueAfter: time.Second * 2}, nil // not sure this is better then just requeue bool
7✔
608
        }
7✔
609

610
        return nil, utilerrors.NewAggregate(errs)
×
611
}
612

613
// SaveNodesAndSkyhook saves nodes and skyhook and will update the events if the skyhook status changes
614
func (r *SkyhookReconciler) SaveNodesAndSkyhook(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes) (bool, []error) {
7✔
615
        saved := false
7✔
616
        errs := make([]error, 0)
7✔
617

7✔
618
        for _, node := range skyhook.GetNodes() {
14✔
619
                patch := client.StrategicMergeFrom(clusterState.tracker.GetOriginal(node.GetNode()))
7✔
620
                if node.Changed() {
14✔
621
                        err := r.Patch(ctx, node.GetNode(), patch)
7✔
622
                        if err != nil {
7✔
623
                                errs = append(errs, fmt.Errorf("error patching node [%s]: %w", node.GetNode().Name, err))
×
624
                        }
×
625
                        saved = true
7✔
626

7✔
627
                        err = r.UpsertNodeLabelsAnnotationsPackages(ctx, skyhook.GetSkyhook(), node.GetNode())
7✔
628
                        if err != nil {
7✔
629
                                errs = append(errs, fmt.Errorf("error upserting labels, annotations, and packages config map for node [%s]: %w", node.GetNode().Name, err))
×
630
                        }
×
631

632
                        if node.IsComplete() {
14✔
633
                                r.recorder.Eventf(node.GetNode(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook [%s] complete.", skyhook.GetSkyhook().Name)
7✔
634

7✔
635
                                // since node is complete remove from priority
7✔
636
                                if _, ok := skyhook.GetSkyhook().Status.NodePriority[node.GetNode().Name]; ok {
14✔
637
                                        delete(skyhook.GetSkyhook().Status.NodePriority, node.GetNode().Name)
7✔
638
                                        skyhook.GetSkyhook().Updated = true
7✔
639
                                }
7✔
640
                        }
641
                }
642

643
                // updates node's condition
644
                node.UpdateCondition()
7✔
645
                if node.Changed() {
14✔
646
                        // conditions are in status
7✔
647
                        err := r.Status().Patch(ctx, node.GetNode(), patch)
7✔
648
                        if err != nil {
12✔
649
                                errs = append(errs, fmt.Errorf("error patching node status [%s]: %w", node.GetNode().Name, err))
5✔
650
                        }
5✔
651
                        saved = true
7✔
652
                }
653

654
                if node.GetSkyhook() != nil && node.GetSkyhook().Updated {
14✔
655
                        skyhook.GetSkyhook().Updated = true
7✔
656
                }
7✔
657
        }
658

659
        if skyhook.GetSkyhook().Updated {
14✔
660
                patch := client.MergeFrom(clusterState.tracker.GetOriginal(skyhook.GetSkyhook().Skyhook))
7✔
661
                err := r.Status().Patch(ctx, skyhook.GetSkyhook().Skyhook, patch)
7✔
662
                if err != nil {
8✔
663
                        errs = append(errs, err)
1✔
664
                }
1✔
665
                saved = true
7✔
666

7✔
667
                if skyhook.GetPriorStatus() != "" && skyhook.GetPriorStatus() != skyhook.Status() {
14✔
668
                        // we transitioned, fire event
7✔
669
                        r.recorder.Eventf(skyhook.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookStateChange, "Skyhook transitioned [%s] -> [%s]", skyhook.GetPriorStatus(), skyhook.Status())
7✔
670
                }
7✔
671
        }
672

673
        if len(errs) > 0 {
13✔
674
                saved = false
6✔
675
        }
6✔
676
        return saved, errs
7✔
677
}
678

679
// HandleVersionChange updates the state for the node or skyhook if a version is changed on a package
680
func HandleVersionChange(skyhook SkyhookNodes) ([]*v1alpha1.Package, error) {
7✔
681
        toUninstall := make([]*v1alpha1.Package, 0)
7✔
682

7✔
683
        for _, node := range skyhook.GetNodes() {
14✔
684
                nodeState, err := node.State()
7✔
685
                if err != nil {
7✔
686
                        return nil, err
×
687
                }
×
688

689
                for _, packageStatus := range nodeState {
14✔
690
                        upgrade := false
7✔
691

7✔
692
                        _package, exists := skyhook.GetSkyhook().Spec.Packages[packageStatus.Name]
7✔
693
                        if exists && _package.Version == packageStatus.Version {
14✔
694
                                continue // no uninstall needed for package
7✔
695
                        }
696

697
                        packageStatusRef := v1alpha1.PackageRef{
5✔
698
                                Name:    packageStatus.Name,
5✔
699
                                Version: packageStatus.Version,
5✔
700
                        }
5✔
701

5✔
702
                        if !exists && packageStatus.Stage != v1alpha1.StageUninstall {
10✔
703
                                // Start uninstall of old package
5✔
704
                                err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
5✔
705
                                if err != nil {
5✔
706
                                        return nil, fmt.Errorf("error updating node status: %w", err)
×
707
                                }
×
708
                        } else if exists && _package.Version != packageStatus.Version {
10✔
709
                                comparison := version.Compare(_package.Version, packageStatus.Version)
5✔
710
                                if comparison == -2 {
5✔
711
                                        return nil, errors.New("error comparing package versions: invalid version string provided enabling webhooks validates versions before being applied")
×
712
                                }
×
713

714
                                if comparison == 1 {
10✔
715
                                        _packageStatus, found := node.PackageStatus(_package.GetUniqueName())
5✔
716
                                        if found && _packageStatus.Stage == v1alpha1.StageUpgrade {
10✔
717
                                                continue
5✔
718
                                        }
719

720
                                        // start upgrade of package
721
                                        err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageUpgrade, 0, _package.ContainerSHA)
5✔
722
                                        if err != nil {
5✔
723
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
724
                                        }
×
725

726
                                        upgrade = true
5✔
727
                                } else if comparison == -1 && packageStatus.Stage != v1alpha1.StageUninstall {
10✔
728
                                        // Start uninstall of old package
5✔
729
                                        err := node.Upsert(packageStatusRef, packageStatus.Image, v1alpha1.StateInProgress, v1alpha1.StageUninstall, 0, "")
5✔
730
                                        if err != nil {
5✔
731
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
732
                                        }
×
733

734
                                        // If version changed then update new version to wait
735
                                        err = node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, v1alpha1.StageUninstall, 0, _package.ContainerSHA)
5✔
736
                                        if err != nil {
5✔
737
                                                return nil, fmt.Errorf("error updating node status: %w", err)
×
738
                                        }
×
739
                                }
740
                        }
741

742
                        // only need to create a feaux package for uninstall since it won't be in the DAG (Upgrade will)
743
                        newPackageStatus, found := node.PackageStatus(packageStatusRef.GetUniqueName())
5✔
744
                        if !upgrade && found && newPackageStatus.Stage == v1alpha1.StageUninstall && newPackageStatus.State == v1alpha1.StateInProgress {
10✔
745
                                // create fake package with the info we can salvage from the node state
5✔
746
                                newPackage := &v1alpha1.Package{
5✔
747
                                        PackageRef: packageStatusRef,
5✔
748
                                        Image:      packageStatus.Image,
5✔
749
                                }
5✔
750

5✔
751
                                // Add package to uninstall list if it's not already present
5✔
752
                                found := false
5✔
753
                                for _, uninstallPackage := range toUninstall {
10✔
754
                                        if reflect.DeepEqual(uninstallPackage, newPackage) {
5✔
755
                                                found = true
×
756
                                        }
×
757
                                }
758

759
                                if !found {
10✔
760
                                        toUninstall = append(toUninstall, newPackage)
5✔
761
                                }
5✔
762
                        }
763

764
                        // remove all config updates for the package since it's being uninstalled or
765
                        // upgraded. NOTE: The config updates must be removed whenever the version changes
766
                        // or else the package interrupt may be skipped if there is one
767
                        skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
5✔
768

5✔
769
                        // set the node and skyhook status to in progress
5✔
770
                        node.SetStatus(v1alpha1.StatusInProgress)
5✔
771
                }
772
        }
773

774
        return toUninstall, nil
7✔
775
}
776

777
// helper for get a point to a ref
778
func ptr[E any](e E) *E {
8✔
779
        return &e
8✔
780
}
8✔
781

782
// generateSafeName generates a consistent name for Kubernetes resources that is unique
783
// while staying within the specified character limit
784
func generateSafeName(maxLen int, nameParts ...string) string {
8✔
785
        name := strings.Join(nameParts, "-")
8✔
786
        // Replace dots with dashes as they're not allowed in resource names
8✔
787
        name = strings.ReplaceAll(name, ".", "-")
8✔
788

8✔
789
        unique := sha256.Sum256([]byte(name))
8✔
790
        uniqueStr := hex.EncodeToString(unique[:])[:8]
8✔
791

8✔
792
        maxlen := maxLen - len(uniqueStr) - 1
8✔
793
        if len(name) > maxlen {
16✔
794
                name = name[:maxlen]
8✔
795
        }
8✔
796

797
        return strings.ToLower(fmt.Sprintf("%s-%s", name, uniqueStr))
8✔
798
}
799

800
func (r *SkyhookReconciler) UpsertNodeLabelsAnnotationsPackages(ctx context.Context, skyhook *wrapper.Skyhook, node *corev1.Node) error {
8✔
801
        // No work to do if there is no labels or annotations for node
8✔
802
        if len(node.Labels) == 0 && len(node.Annotations) == 0 {
8✔
803
                return nil
×
804
        }
×
805

806
        annotations, err := json.Marshal(node.Annotations)
8✔
807
        if err != nil {
8✔
808
                return fmt.Errorf("error converting annotations into byte array: %w", err)
×
809
        }
×
810

811
        labels, err := json.Marshal(node.Labels)
8✔
812
        if err != nil {
8✔
813
                return fmt.Errorf("error converting labels into byte array: %w", err)
×
814
        }
×
815

816
        // marshal intermediary package metadata for the agent
817
        metadata := NewSkyhookMetadata(r.opts, skyhook)
8✔
818
        packages, err := metadata.Marshal()
8✔
819
        if err != nil {
8✔
820
                return fmt.Errorf("error converting packages into byte array: %w", err)
×
821
        }
×
822

823
        configMapName := generateSafeName(253, skyhook.Name, node.Name, "metadata")
8✔
824
        newCM := &corev1.ConfigMap{
8✔
825
                ObjectMeta: metav1.ObjectMeta{
8✔
826
                        Name:      configMapName,
8✔
827
                        Namespace: r.opts.Namespace,
8✔
828
                        Labels: map[string]string{
8✔
829
                                fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhook.Name,
8✔
830
                        },
8✔
831
                        Annotations: map[string]string{
8✔
832
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
8✔
833
                                fmt.Sprintf("%s/Node.name", v1alpha1.METADATA_PREFIX): node.Name,
8✔
834
                        },
8✔
835
                },
8✔
836
                Data: map[string]string{
8✔
837
                        "annotations.json": string(annotations),
8✔
838
                        "labels.json":      string(labels),
8✔
839
                        "packages.json":    string(packages),
8✔
840
                },
8✔
841
        }
8✔
842

8✔
843
        if err := ctrl.SetControllerReference(skyhook.Skyhook, newCM, r.scheme); err != nil {
8✔
844
                return fmt.Errorf("error setting ownership: %w", err)
×
845
        }
×
846

847
        existingConfigMap := &corev1.ConfigMap{}
8✔
848
        err = r.Get(ctx, client.ObjectKey{Namespace: r.opts.Namespace, Name: configMapName}, existingConfigMap)
8✔
849
        if err != nil {
16✔
850
                if apierrors.IsNotFound(err) {
16✔
851
                        // create
8✔
852
                        err := r.Create(ctx, newCM)
8✔
853
                        if err != nil {
8✔
854
                                return fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
855
                        }
×
856
                } else {
×
857
                        return fmt.Errorf("error getting config map: %w", err)
×
858
                }
×
859
        } else {
7✔
860
                if !reflect.DeepEqual(existingConfigMap.Data, newCM.Data) {
14✔
861
                        // update
7✔
862
                        err := r.Update(ctx, newCM)
7✔
863
                        if err != nil {
7✔
864
                                return fmt.Errorf("error updating config map [%s]: %w", newCM.Name, err)
×
865
                        }
×
866
                }
867
        }
868

869
        return nil
8✔
870
}
871

872
// HandleConfigUpdates checks whether the configMap on a package was updated and if it was the configmap will
873
// be updated and the package will be put into config mode if the package is complete or erroring
874
func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterState *clusterState, skyhook SkyhookNodes, _package v1alpha1.Package, oldConfigMap, newConfigMap *corev1.ConfigMap) (bool, error) {
6✔
875
        completedNodes, nodeCount := 0, len(skyhook.GetNodes())
6✔
876
        erroringNode := false
6✔
877

6✔
878
        // if configmap changed
6✔
879
        if !reflect.DeepEqual(oldConfigMap.Data, newConfigMap.Data) {
11✔
880
                for _, node := range skyhook.GetNodes() {
10✔
881
                        exists, err := r.PodExists(ctx, node.GetNode().Name, skyhook.GetSkyhook().Name, &_package)
5✔
882
                        if err != nil {
5✔
883
                                return false, err
×
884
                        }
×
885

886
                        if !exists && node.IsPackageComplete(_package) {
10✔
887
                                completedNodes++
5✔
888
                        }
5✔
889

890
                        // if we have an erroring node in the config, interrupt, or post-interrupt mode
891
                        // then we will restart the config changes
892
                        if packageStatus, found := node.PackageStatus(_package.GetUniqueName()); found {
10✔
893
                                switch packageStatus.Stage {
5✔
894
                                case v1alpha1.StageConfig, v1alpha1.StageInterrupt, v1alpha1.StagePostInterrupt:
5✔
895
                                        if packageStatus.State == v1alpha1.StateErroring {
5✔
896
                                                erroringNode = true
×
897

×
898
                                                // delete the erroring pod from the node so that it can be recreated
×
899
                                                // with the updated configmap
×
900
                                                pods, err := r.dal.GetPods(ctx,
×
901
                                                        client.MatchingFields{
×
902
                                                                "spec.nodeName": node.GetNode().Name,
×
903
                                                        },
×
904
                                                        client.MatchingLabels{
×
905
                                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.GetSkyhook().Name,
×
906
                                                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
×
907
                                                        },
×
908
                                                )
×
909
                                                if err != nil {
×
910
                                                        return false, err
×
911
                                                }
×
912

913
                                                if pods != nil {
×
914
                                                        for _, pod := range pods.Items {
×
915
                                                                err := r.Delete(ctx, &pod)
×
916
                                                                if err != nil {
×
917
                                                                        return false, err
×
918
                                                                }
×
919
                                                        }
920
                                                }
921
                                        }
922
                                }
923
                        }
924
                }
925

926
                // if the update is complete or there is an erroring node put the package back into
927
                // the config mode and update the config map
928
                if completedNodes == nodeCount || erroringNode {
10✔
929
                        // get the keys in the configmap that changed
5✔
930
                        newConfigUpdates := make([]string, 0)
5✔
931
                        for key, new_val := range newConfigMap.Data {
10✔
932
                                if old_val, exists := oldConfigMap.Data[key]; !exists || old_val != new_val {
10✔
933
                                        newConfigUpdates = append(newConfigUpdates, key)
5✔
934
                                }
5✔
935
                        }
936

937
                        // if updates completed then clear out old config updates as they are finished
938
                        if completedNodes == nodeCount {
10✔
939
                                skyhook.GetSkyhook().RemoveConfigUpdates(_package.Name)
5✔
940
                        }
5✔
941

942
                        // Add the new changed keys to the config updates
943
                        skyhook.GetSkyhook().AddConfigUpdates(_package.Name, newConfigUpdates...)
5✔
944

5✔
945
                        for _, node := range skyhook.GetNodes() {
10✔
946
                                err := node.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageConfig, 0, _package.ContainerSHA)
5✔
947
                                if err != nil {
5✔
948
                                        return false, fmt.Errorf("error upserting node status [%s]: %w", node.GetNode().Name, err)
×
949
                                }
×
950

951
                                node.SetStatus(v1alpha1.StatusInProgress)
5✔
952
                        }
953

954
                        _, errs := r.SaveNodesAndSkyhook(ctx, clusterState, skyhook)
5✔
955
                        if len(errs) > 0 {
5✔
956
                                return false, utilerrors.NewAggregate(errs)
×
957
                        }
×
958

959
                        // update config map
960
                        err := r.Update(ctx, newConfigMap)
5✔
961
                        if err != nil {
5✔
962
                                return false, fmt.Errorf("error updating config map [%s]: %w", newConfigMap.Name, err)
×
963
                        }
×
964

965
                        return true, nil
5✔
966
                }
967
        }
968

969
        return false, nil
6✔
970
}
971

972
func (r *SkyhookReconciler) UpsertConfigmaps(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, error) {
7✔
973
        updated := false
7✔
974

7✔
975
        var list corev1.ConfigMapList
7✔
976
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name})
7✔
977
        if err != nil {
7✔
978
                return false, fmt.Errorf("error listing config maps while upserting: %w", err)
×
979
        }
×
980

981
        existingCMs := make(map[string]corev1.ConfigMap)
7✔
982
        for _, cm := range list.Items {
13✔
983
                existingCMs[cm.Name] = cm
6✔
984
        }
6✔
985

986
        // clean up from an update
987
        shouldExist := make(map[string]struct{})
7✔
988
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
14✔
989
                shouldExist[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))] = struct{}{}
7✔
990
        }
7✔
991

992
        for k, v := range existingCMs {
13✔
993
                if _, ok := shouldExist[k]; !ok {
11✔
994
                        // delete
5✔
995
                        err := r.Delete(ctx, &v)
5✔
996
                        if err != nil {
5✔
997
                                return false, fmt.Errorf("error deleting existing config map [%s] while upserting: %w", v.Name, err)
×
998
                        }
×
999
                }
1000
        }
1001

1002
        for _, _package := range skyhook.GetSkyhook().Spec.Packages {
14✔
1003
                if len(_package.ConfigMap) > 0 {
13✔
1004

6✔
1005
                        newCM := &corev1.ConfigMap{
6✔
1006
                                ObjectMeta: metav1.ObjectMeta{
6✔
1007
                                        Name:      strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version)),
6✔
1008
                                        Namespace: r.opts.Namespace,
6✔
1009
                                        Labels: map[string]string{
6✔
1010
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
6✔
1011
                                        },
6✔
1012
                                        Annotations: map[string]string{
6✔
1013
                                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):            skyhook.GetSkyhook().Name,
6✔
1014
                                                fmt.Sprintf("%s/Package.Name", v1alpha1.METADATA_PREFIX):    _package.Name,
6✔
1015
                                                fmt.Sprintf("%s/Package.Version", v1alpha1.METADATA_PREFIX): _package.Version,
6✔
1016
                                        },
6✔
1017
                                },
6✔
1018
                                Data: _package.ConfigMap,
6✔
1019
                        }
6✔
1020
                        // set owner of CM to the SCR, which will clean up the CM in delete of the SCR
6✔
1021
                        if err := ctrl.SetControllerReference(skyhook.GetSkyhook().Skyhook, newCM, r.scheme); err != nil {
6✔
1022
                                return false, fmt.Errorf("error setting ownership of cm: %w", err)
×
1023
                        }
×
1024

1025
                        if existingCM, ok := existingCMs[strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.GetSkyhook().Name, _package.Name, _package.Version))]; ok {
12✔
1026
                                updatedConfigMap, err := r.HandleConfigUpdates(ctx, clusterState, skyhook, _package, &existingCM, newCM)
6✔
1027
                                if err != nil {
6✔
1028
                                        return false, fmt.Errorf("error updating config map [%s]: %s", newCM.Name, err)
×
1029
                                }
×
1030
                                if updatedConfigMap {
11✔
1031
                                        updated = true
5✔
1032
                                }
5✔
1033
                        } else {
6✔
1034
                                // create
6✔
1035
                                err := r.Create(ctx, newCM)
6✔
1036
                                if err != nil {
6✔
1037
                                        return false, fmt.Errorf("error creating config map [%s]: %w", newCM.Name, err)
×
1038
                                }
×
1039
                        }
1040
                }
1041
        }
1042

1043
        return updated, nil
7✔
1044
}
1045

1046
func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1047

5✔
1048
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
5✔
1049
                "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1050
        })
5✔
1051
        if err != nil {
5✔
1052
                return false, err
×
1053
        }
×
1054

1055
        if pods == nil || len(pods.Items) == 0 {
5✔
1056
                return true, nil
×
1057
        }
×
1058

1059
        // checking for any running or pending pods with no toleration to unschedulable
1060
        // if its has an unschedulable toleration we can ignore
1061
        for _, pod := range pods.Items {
10✔
1062

5✔
1063
                if ShouldEvict(&pod) {
10✔
1064
                        return false, nil
5✔
1065
                }
5✔
1066

1067
        }
1068

1069
        return true, nil
5✔
1070
}
1071

1072
func ShouldEvict(pod *corev1.Pod) bool {
5✔
1073
        switch pod.Status.Phase {
5✔
1074
        case corev1.PodRunning, corev1.PodPending:
5✔
1075

5✔
1076
                for _, taint := range pod.Spec.Tolerations {
10✔
1077
                        switch taint.Key {
5✔
1078
                        case "node.kubernetes.io/unschedulable": // ignoring
5✔
1079
                                return false
5✔
1080
                        }
1081
                }
1082

1083
                if len(pod.ObjectMeta.OwnerReferences) > 1 {
5✔
1084
                        for _, owner := range pod.ObjectMeta.OwnerReferences {
×
1085
                                if owner.Kind == "DaemonSet" { // ignoring
×
1086
                                        return false
×
1087
                                }
×
1088
                        }
1089
                }
1090

1091
                if pod.GetNamespace() == "kube-system" {
5✔
1092
                        return false
×
1093
                }
×
1094

1095
                return true
5✔
1096
        }
1097
        return false
5✔
1098
}
1099

1100
// HandleFinalizer returns true only if we container is deleted and we handled it completely, else false
1101
func (r *SkyhookReconciler) HandleFinalizer(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
7✔
1102
        if skyhook.GetSkyhook().DeletionTimestamp.IsZero() { // if not deleted, and does not have our finalizer, add it
14✔
1103
                if !controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
14✔
1104
                        controllerutil.AddFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
7✔
1105

7✔
1106
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
7✔
1107
                                return false, fmt.Errorf("error updating skyhook to add finalizer: %w", err)
×
1108
                        }
×
1109
                }
1110
        } else { // being delete, time to handle our
7✔
1111
                if controllerutil.ContainsFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer) {
14✔
1112

7✔
1113
                        errs := make([]error, 0)
7✔
1114

7✔
1115
                        // zero out all the metrics related to this skyhook both skyhook and packages
7✔
1116
                        zeroOutSkyhookMetrics(skyhook)
7✔
1117

7✔
1118
                        for _, node := range skyhook.GetNodes() {
13✔
1119
                                patch := client.StrategicMergeFrom(node.GetNode().DeepCopy())
6✔
1120

6✔
1121
                                node.Uncordon()
6✔
1122

6✔
1123
                                // if this doesn't change the node then don't patch
6✔
1124
                                if !node.Changed() {
12✔
1125
                                        continue
6✔
1126
                                }
1127

1128
                                err := r.Patch(ctx, node.GetNode(), patch)
4✔
1129
                                if err != nil {
4✔
1130
                                        errs = append(errs, fmt.Errorf("error patching node [%s] in finalizer: %w", node.GetNode().Name, err))
×
1131
                                }
×
1132
                        }
1133

1134
                        if len(errs) > 0 { // we errored, so we need to return error, otherwise we would release the skyhook when we didnt finish
7✔
1135
                                return false, utilerrors.NewAggregate(errs)
×
1136
                        }
×
1137

1138
                        controllerutil.RemoveFinalizer(skyhook.GetSkyhook().Skyhook, SkyhookFinalizer)
7✔
1139
                        if err := r.Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
11✔
1140
                                return false, fmt.Errorf("error updating skyhook removing finalizer: %w", err)
4✔
1141
                        }
4✔
1142
                        // should be 1, and now 2. we want to set ObservedGeneration up to not trigger an logic from this update adding the finalizer
1143
                        skyhook.GetSkyhook().Status.ObservedGeneration = skyhook.GetSkyhook().Status.ObservedGeneration + 1
7✔
1144

7✔
1145
                        if err := r.Status().Update(ctx, skyhook.GetSkyhook().Skyhook); err != nil {
14✔
1146
                                return false, fmt.Errorf("error updating skyhook status: %w", err)
7✔
1147
                        }
7✔
1148

1149
                        return true, nil
×
1150
                }
1151
        }
1152
        return false, nil
7✔
1153
}
1154

1155
// HasNonInterruptWork returns true if pods are running on the node that are either packages, or matches the SCR selector
1156
func (r *SkyhookReconciler) HasNonInterruptWork(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1157

5✔
1158
        selector, err := metav1.LabelSelectorAsSelector(&skyhookNode.GetSkyhook().Spec.PodNonInterruptLabels)
5✔
1159
        if err != nil {
5✔
1160
                return false, fmt.Errorf("error creating selector: %w", err)
×
1161
        }
×
1162

1163
        if selector.Empty() { // when selector is empty it does not do any selecting, ie will return all pods on node.
10✔
1164
                return false, nil
5✔
1165
        }
5✔
1166

1167
        pods, err := r.dal.GetPods(ctx,
5✔
1168
                client.MatchingLabelsSelector{Selector: selector},
5✔
1169
                client.MatchingFields{
5✔
1170
                        "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1171
                },
5✔
1172
        )
5✔
1173
        if err != nil {
5✔
1174
                return false, fmt.Errorf("error getting pods: %w", err)
×
1175
        }
×
1176

1177
        if pods == nil || len(pods.Items) == 0 {
10✔
1178
                return false, nil
5✔
1179
        }
5✔
1180

1181
        for _, pod := range pods.Items {
10✔
1182
                switch pod.Status.Phase {
5✔
1183
                case corev1.PodRunning, corev1.PodPending:
5✔
1184
                        return true, nil
5✔
1185
                }
1186
        }
1187

1188
        return false, nil
×
1189
}
1190

1191
func (r *SkyhookReconciler) HasRunningPackages(ctx context.Context, skyhookNode wrapper.SkyhookNode) (bool, error) {
5✔
1192
        pods, err := r.dal.GetPods(ctx,
5✔
1193
                client.HasLabels{fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX)},
5✔
1194
                client.MatchingFields{
5✔
1195
                        "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1196
                },
5✔
1197
        )
5✔
1198
        if err != nil {
5✔
1199
                return false, fmt.Errorf("error getting pods: %w", err)
×
1200
        }
×
1201

1202
        return pods != nil && len(pods.Items) > 0, nil
5✔
1203
}
1204

1205
func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
5✔
1206
        drained, err := r.IsDrained(ctx, skyhookNode)
5✔
1207
        if err != nil {
5✔
1208
                return false, err
×
1209
        }
×
1210
        if drained {
10✔
1211
                return true, nil
5✔
1212
        }
5✔
1213

1214
        pods, err := r.dal.GetPods(ctx, client.MatchingFields{
5✔
1215
                "spec.nodeName": skyhookNode.GetNode().Name,
5✔
1216
        })
5✔
1217
        if err != nil {
5✔
1218
                return false, err
×
1219
        }
×
1220

1221
        if pods == nil || len(pods.Items) == 0 {
5✔
1222
                return true, nil
×
1223
        }
×
1224

1225
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookInterrupt,
5✔
1226
                "draining node [%s] package [%s:%s] from [skyhook:%s]",
5✔
1227
                skyhookNode.GetNode().Name,
5✔
1228
                _package.Name,
5✔
1229
                _package.Version,
5✔
1230
                skyhookNode.GetSkyhook().Name,
5✔
1231
        )
5✔
1232

5✔
1233
        errs := make([]error, 0)
5✔
1234
        for _, pod := range pods.Items {
10✔
1235

5✔
1236
                if ShouldEvict(&pod) {
10✔
1237
                        eviction := policyv1.Eviction{}
5✔
1238
                        err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction)
5✔
1239
                        if err != nil {
5✔
1240
                                errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err))
×
1241
                        }
×
1242
                }
1243
        }
1244

1245
        return len(errs) == 0, utilerrors.NewAggregate(errs)
5✔
1246
}
1247

1248
// Interrupt should not be called unless safe to do so, IE already cordoned and drained
1249
func (r *SkyhookReconciler) Interrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, _interrupt *v1alpha1.Interrupt) error {
5✔
1250

5✔
1251
        hasPackagesRunning, err := r.HasRunningPackages(ctx, skyhookNode)
5✔
1252
        if err != nil {
5✔
1253
                return err
×
1254
        }
×
1255

1256
        if hasPackagesRunning { // keep waiting...
10✔
1257
                return nil
5✔
1258
        }
5✔
1259

1260
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
5✔
1261
        if err != nil {
5✔
1262
                return err
×
1263
        }
×
1264
        if exists {
5✔
1265
                // nothing to do here, already running
×
1266
                return nil
×
1267
        }
×
1268

1269
        argEncode, err := _interrupt.ToArgs()
5✔
1270
        if err != nil {
5✔
1271
                return fmt.Errorf("error creating interrupt args: %w", err)
×
1272
        }
×
1273

1274
        pod := createInterruptPodForPackage(r.opts, _interrupt, argEncode, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name)
5✔
1275

5✔
1276
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, v1alpha1.StageInterrupt, _package); err != nil {
5✔
1277
                return fmt.Errorf("error setting package on interrupt: %w", err)
×
1278
        }
×
1279

1280
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
5✔
1281
                return fmt.Errorf("error setting ownership: %w", err)
×
1282
        }
×
1283

1284
        if err := r.Create(ctx, pod); err != nil {
5✔
1285
                return fmt.Errorf("error creating interruption pod: %w", err)
×
1286
        }
×
1287

1288
        _ = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, v1alpha1.StageInterrupt, 0, _package.ContainerSHA)
5✔
1289

5✔
1290
        r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, EventTypeNormal, EventsReasonSkyhookInterrupt,
5✔
1291
                "Interrupting node [%s] package [%s:%s] from [skyhook:%s]",
5✔
1292
                skyhookNode.GetNode().Name,
5✔
1293
                _package.Name,
5✔
1294
                _package.Version,
5✔
1295
                skyhookNode.GetSkyhook().Name)
5✔
1296

5✔
1297
        return nil
5✔
1298
}
1299

1300
// fudgeInterruptWithPriority takes a list of packages, interrupts, and configUpdates and returns the correct merged interrupt to run to handle all the packages
1301
func fudgeInterruptWithPriority(next []*v1alpha1.Package, configUpdates map[string][]string, interrupts map[string][]*v1alpha1.Interrupt) (*v1alpha1.Interrupt, string) {
8✔
1302
        var ret *v1alpha1.Interrupt
8✔
1303
        var pack string
8✔
1304

8✔
1305
        // map interrupt to priority
8✔
1306
        // A lower priority value means a higher priority and will be used in favor of anything with a higher value
8✔
1307
        var priorities = map[v1alpha1.InterruptType]int{
8✔
1308
                v1alpha1.REBOOT:               0,
8✔
1309
                v1alpha1.RESTART_ALL_SERVICES: 1,
8✔
1310
                v1alpha1.SERVICE:              2,
8✔
1311
                v1alpha1.NOOP:                 3,
8✔
1312
        }
8✔
1313

8✔
1314
        for _, _package := range next {
16✔
1315

8✔
1316
                if len(configUpdates[_package.Name]) == 0 {
16✔
1317
                        interrupts[_package.Name] = []*v1alpha1.Interrupt{}
8✔
1318
                        if _package.HasInterrupt() {
14✔
1319
                                interrupts[_package.Name] = append(interrupts[_package.Name], _package.Interrupt)
6✔
1320
                        }
6✔
1321
                }
1322
        }
1323

1324
        packageNames := make([]string, 0)
8✔
1325
        for _, pkg := range next {
16✔
1326
                packageNames = append(packageNames, pkg.Name)
8✔
1327
        }
8✔
1328
        sort.Strings(packageNames)
8✔
1329

8✔
1330
        for _, _package := range packageNames {
16✔
1331
                _interrupts, ok := interrupts[_package]
8✔
1332
                if !ok {
14✔
1333
                        continue
6✔
1334
                }
1335

1336
                for _, interrupt := range _interrupts {
14✔
1337
                        if ret == nil { // prime ret, base case
12✔
1338
                                ret = interrupt
6✔
1339
                                pack = _package
6✔
1340
                        }
6✔
1341

1342
                        // short circuit, reboot has highest priority
1343
                        switch interrupt.Type {
6✔
1344
                        case v1alpha1.REBOOT:
6✔
1345
                                return interrupt, _package
6✔
1346
                        }
1347

1348
                        // check if interrupt is higher priority using the priority_order
1349
                        // A lower priority value means a higher priority
1350
                        if priorities[interrupt.Type] < priorities[ret.Type] {
7✔
1351
                                ret = interrupt
1✔
1352
                                pack = _package
1✔
1353
                        } else if priorities[interrupt.Type] == priorities[ret.Type] {
13✔
1354
                                mergeInterrupt(ret, interrupt)
6✔
1355
                        }
6✔
1356
                }
1357
        }
1358

1359
        return ret, pack // return merged interrupt and package
8✔
1360
}
1361

1362
func mergeInterrupt(left, right *v1alpha1.Interrupt) {
6✔
1363

6✔
1364
        // make sure both are of type service
6✔
1365
        if left.Type != v1alpha1.SERVICE || right.Type != v1alpha1.SERVICE {
7✔
1366
                return
1✔
1367
        }
1✔
1368

1369
        left.Services = merge(left.Services, right.Services)
6✔
1370
}
1371

1372
func merge[T cmp.Ordered](left, right []T) []T {
6✔
1373
        for _, r := range right {
12✔
1374
                if !slices.Contains(left, r) {
12✔
1375
                        left = append(left, r)
6✔
1376
                }
6✔
1377
        }
1378
        slices.Sort(left)
6✔
1379
        return left
6✔
1380
}
1381

1382
// ValidateNodeConfigmaps validates that there are no orphaned or stale config maps for a node
1383
func (r *SkyhookReconciler) ValidateNodeConfigmaps(ctx context.Context, skyhookName string, nodes []wrapper.SkyhookNode) (bool, error) {
7✔
1384
        var list corev1.ConfigMapList
7✔
1385
        err := r.List(ctx, &list, client.InNamespace(r.opts.Namespace), client.MatchingLabels{fmt.Sprintf("%s/skyhook-node-meta", v1alpha1.METADATA_PREFIX): skyhookName})
7✔
1386
        if err != nil {
7✔
1387
                return false, fmt.Errorf("error listing config maps: %w", err)
×
1388
        }
×
1389

1390
        // No configmaps created by this skyhook, no work needs to be done
1391
        if len(list.Items) == 0 {
14✔
1392
                return false, nil
7✔
1393
        }
7✔
1394

1395
        existingCMs := make(map[string]corev1.ConfigMap)
7✔
1396
        for _, cm := range list.Items {
14✔
1397
                existingCMs[cm.Name] = cm
7✔
1398
        }
7✔
1399

1400
        shouldExist := make(map[string]struct{})
7✔
1401
        for _, node := range nodes {
14✔
1402
                shouldExist[generateSafeName(253, skyhookName, node.GetNode().Name, "metadata")] = struct{}{}
7✔
1403
        }
7✔
1404

1405
        update := false
7✔
1406
        errs := make([]error, 0)
7✔
1407
        for k, v := range existingCMs {
14✔
1408
                if _, ok := shouldExist[k]; !ok {
7✔
1409
                        update = true
×
1410
                        err := r.Delete(ctx, &v)
×
1411
                        if err != nil {
×
1412
                                errs = append(errs, fmt.Errorf("error deleting existing config map [%s]: %w", v.Name, err))
×
1413
                        }
×
1414
                }
1415
        }
1416

1417
        // Ensure packages.json is present and up-to-date for expected configmaps
1418
        skyhookCR, err := r.dal.GetSkyhook(ctx, skyhookName)
7✔
1419
        if err != nil {
7✔
1420
                return update, fmt.Errorf("error getting skyhook for metadata validation: %w", err)
×
1421
        }
×
1422
        skyhookWrapper := wrapper.NewSkyhookWrapper(skyhookCR)
7✔
1423
        metadata := NewSkyhookMetadata(r.opts, skyhookWrapper)
7✔
1424
        expectedBytes, err := metadata.Marshal()
7✔
1425
        if err != nil {
7✔
1426
                return update, fmt.Errorf("error marshalling metadata for validation: %w", err)
×
1427
        }
×
1428
        expected := string(expectedBytes)
7✔
1429

7✔
1430
        for i := range list.Items {
14✔
1431
                cm := &list.Items[i]
7✔
1432
                if _, ok := shouldExist[cm.Name]; !ok {
7✔
1433
                        continue
×
1434
                }
1435
                if cm.Data == nil {
7✔
1436
                        cm.Data = make(map[string]string)
×
1437
                }
×
1438
                if cm.Data["packages.json"] != expected {
12✔
1439
                        cm.Data["packages.json"] = expected
5✔
1440
                        if err := r.Update(ctx, cm); err != nil {
7✔
1441
                                errs = append(errs, fmt.Errorf("error updating packages.json on config map [%s]: %w", cm.Name, err))
2✔
1442
                        } else {
7✔
1443
                                update = true
5✔
1444
                        }
5✔
1445
                }
1446
        }
1447

1448
        return update, utilerrors.NewAggregate(errs)
7✔
1449
}
1450

1451
// PodExists tests if this package is exists on a node.
1452
func (r *SkyhookReconciler) PodExists(ctx context.Context, nodeName, skyhookName string, _package *v1alpha1.Package) (bool, error) {
7✔
1453

7✔
1454
        pods, err := r.dal.GetPods(ctx,
7✔
1455
                client.MatchingFields{
7✔
1456
                        "spec.nodeName": nodeName,
7✔
1457
                },
7✔
1458
                client.MatchingLabels{
7✔
1459
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhookName,
7✔
1460
                        fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
7✔
1461
                },
7✔
1462
        )
7✔
1463
        if err != nil {
7✔
1464
                return false, fmt.Errorf("error check from existing pods: %w", err)
×
1465
        }
×
1466

1467
        if pods == nil || len(pods.Items) == 0 {
14✔
1468
                return false, nil
7✔
1469
        }
7✔
1470
        return true, nil
7✔
1471
}
1472

1473
// createInterruptPodForPackage returns the pod spec for an interrupt pod given an package
1474
func createInterruptPodForPackage(opts SkyhookOperatorOptions, _interrupt *v1alpha1.Interrupt, argEncode string, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string) *corev1.Pod {
6✔
1475
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
6✔
1476
                opts.CopyDirRoot,
6✔
1477
                skyhook.Name,
6✔
1478
                _package.Name,
6✔
1479
                _package.Version,
6✔
1480
                skyhook.UID,
6✔
1481
                skyhook.Generation,
6✔
1482
        )
6✔
1483

6✔
1484
        volumes := []corev1.Volume{
6✔
1485
                {
6✔
1486
                        Name: "root-mount",
6✔
1487
                        VolumeSource: corev1.VolumeSource{
6✔
1488
                                HostPath: &corev1.HostPathVolumeSource{
6✔
1489
                                        Path: "/",
6✔
1490
                                },
6✔
1491
                        },
6✔
1492
                },
6✔
1493
                {
6✔
1494
                        // node names in different CSPs might include dots which isn't allowed in volume names
6✔
1495
                        // so we have to replace all dots with dashes
6✔
1496
                        Name: generateSafeName(63, skyhook.Name, nodeName, "metadata"),
6✔
1497
                        VolumeSource: corev1.VolumeSource{
6✔
1498
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
1499
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
1500
                                                Name: strings.ReplaceAll(fmt.Sprintf("%s-%s-metadata", skyhook.Name, nodeName), ".", "-"),
6✔
1501
                                        },
6✔
1502
                                },
6✔
1503
                        },
6✔
1504
                },
6✔
1505
        }
6✔
1506
        volumeMounts := []corev1.VolumeMount{
6✔
1507
                {
6✔
1508
                        Name:             "root-mount",
6✔
1509
                        MountPath:        "/root",
6✔
1510
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
6✔
1511
                },
6✔
1512
        }
6✔
1513

6✔
1514
        pod := &corev1.Pod{
6✔
1515
                ObjectMeta: metav1.ObjectMeta{
6✔
1516
                        Name:      generateSafeName(63, skyhook.Name, "interrupt", string(_interrupt.Type), nodeName),
6✔
1517
                        Namespace: opts.Namespace,
6✔
1518
                        Labels: map[string]string{
6✔
1519
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):      skyhook.Name,
6✔
1520
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX):   fmt.Sprintf("%s-%s", _package.Name, _package.Version),
6✔
1521
                                fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX): "True",
6✔
1522
                        },
6✔
1523
                },
6✔
1524
                Spec: corev1.PodSpec{
6✔
1525
                        NodeName:      nodeName,
6✔
1526
                        RestartPolicy: corev1.RestartPolicyOnFailure,
6✔
1527
                        InitContainers: []corev1.Container{
6✔
1528
                                {
6✔
1529
                                        Name:  InterruptContainerName,
6✔
1530
                                        Image: getAgentImage(opts, _package),
6✔
1531
                                        Args:  []string{"interrupt", "/root", copyDir, argEncode},
6✔
1532
                                        Env:   getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name),
6✔
1533
                                        SecurityContext: &corev1.SecurityContext{
6✔
1534
                                                Privileged: ptr(true),
6✔
1535
                                        },
6✔
1536
                                        VolumeMounts: volumeMounts,
6✔
1537
                                        Resources: corev1.ResourceRequirements{
6✔
1538
                                                Limits: corev1.ResourceList{
6✔
1539
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
6✔
1540
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
6✔
1541
                                                },
6✔
1542
                                                Requests: corev1.ResourceList{
6✔
1543
                                                        corev1.ResourceCPU:    resource.MustParse("500m"),
6✔
1544
                                                        corev1.ResourceMemory: resource.MustParse("64Mi"),
6✔
1545
                                                },
6✔
1546
                                        },
6✔
1547
                                },
6✔
1548
                        },
6✔
1549
                        Containers: []corev1.Container{
6✔
1550
                                {
6✔
1551
                                        Name:  "pause",
6✔
1552
                                        Image: opts.PauseImage,
6✔
1553
                                        Resources: corev1.ResourceRequirements{
6✔
1554
                                                Limits: corev1.ResourceList{
6✔
1555
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
6✔
1556
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
6✔
1557
                                                },
6✔
1558
                                                Requests: corev1.ResourceList{
6✔
1559
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
6✔
1560
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
6✔
1561
                                                },
6✔
1562
                                        },
6✔
1563
                                },
6✔
1564
                        },
6✔
1565
                        ImagePullSecrets: []corev1.LocalObjectReference{
6✔
1566
                                {
6✔
1567
                                        Name: opts.ImagePullSecret,
6✔
1568
                                },
6✔
1569
                        },
6✔
1570
                        HostPID:     true,
6✔
1571
                        HostNetwork: true,
6✔
1572
                        // If you change these go change the SelectNode toleration in cluster_state.go
6✔
1573
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
6✔
1574
                                {
6✔
1575
                                        Key:      TaintUnschedulable,
6✔
1576
                                        Operator: corev1.TolerationOpExists,
6✔
1577
                                },
6✔
1578
                                opts.GetRuntimeRequiredToleration(),
6✔
1579
                        }, skyhook.Spec.AdditionalTolerations...),
6✔
1580
                        Volumes: volumes,
6✔
1581
                },
6✔
1582
        }
6✔
1583
        return pod
6✔
1584
}
6✔
1585

1586
func trunstr(str string, length int) string {
8✔
1587
        if len(str) > length {
8✔
1588
                return str[:length]
×
1589
        }
×
1590
        return str
8✔
1591
}
1592

1593
func getAgentImage(opts SkyhookOperatorOptions, _package *v1alpha1.Package) string {
8✔
1594
        if _package.AgentImageOverride != "" {
13✔
1595
                return _package.AgentImageOverride
5✔
1596
        }
5✔
1597
        return opts.AgentImage
8✔
1598
}
1599

1600
// getPackageImage returns the full image reference for a package, using the digest if specified
1601
func getPackageImage(_package *v1alpha1.Package) string {
8✔
1602
        if _package.ContainerSHA != "" {
13✔
1603
                // When containerSHA is specified, use it instead of the version tag for immutable image reference
5✔
1604
                return fmt.Sprintf("%s@%s", _package.Image, _package.ContainerSHA)
5✔
1605
        }
5✔
1606
        // Fall back to version tag
1607
        return fmt.Sprintf("%s:%s", _package.Image, _package.Version)
8✔
1608
}
1609

1610
func getAgentConfigEnvVars(opts SkyhookOperatorOptions, packageName string, packageVersion string, resourceID string, skyhookName string) []corev1.EnvVar {
8✔
1611
        return []corev1.EnvVar{
8✔
1612
                {
8✔
1613
                        Name:  "SKYHOOK_LOG_DIR",
8✔
1614
                        Value: fmt.Sprintf("%s/%s", opts.AgentLogRoot, skyhookName),
8✔
1615
                },
8✔
1616
                {
8✔
1617
                        Name:  "SKYHOOK_ROOT_DIR",
8✔
1618
                        Value: fmt.Sprintf("%s/%s", opts.CopyDirRoot, skyhookName),
8✔
1619
                },
8✔
1620
                {
8✔
1621
                        Name:  "COPY_RESOLV",
8✔
1622
                        Value: "false",
8✔
1623
                },
8✔
1624
                {
8✔
1625
                        Name:  "SKYHOOK_RESOURCE_ID",
8✔
1626
                        Value: fmt.Sprintf("%s_%s_%s", resourceID, packageName, packageVersion),
8✔
1627
                },
8✔
1628
        }
8✔
1629
}
8✔
1630

1631
// createPodFromPackage creates a pod spec for a skyhook pod for a given package
1632
func createPodFromPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, skyhook *wrapper.Skyhook, nodeName string, stage v1alpha1.Stage) *corev1.Pod {
8✔
1633
        // Generate consistent names that won't exceed k8s limits
8✔
1634
        volumeName := generateSafeName(63, "metadata", nodeName)
8✔
1635
        configMapName := generateSafeName(253, skyhook.Name, nodeName, "metadata")
8✔
1636

8✔
1637
        volumes := []corev1.Volume{
8✔
1638
                {
8✔
1639
                        Name: "root-mount",
8✔
1640
                        VolumeSource: corev1.VolumeSource{
8✔
1641
                                HostPath: &corev1.HostPathVolumeSource{
8✔
1642
                                        Path: "/",
8✔
1643
                                },
8✔
1644
                        },
8✔
1645
                },
8✔
1646
                {
8✔
1647
                        Name: volumeName,
8✔
1648
                        VolumeSource: corev1.VolumeSource{
8✔
1649
                                ConfigMap: &corev1.ConfigMapVolumeSource{
8✔
1650
                                        LocalObjectReference: corev1.LocalObjectReference{
8✔
1651
                                                Name: configMapName,
8✔
1652
                                        },
8✔
1653
                                },
8✔
1654
                        },
8✔
1655
                },
8✔
1656
        }
8✔
1657

8✔
1658
        volumeMounts := []corev1.VolumeMount{
8✔
1659
                {
8✔
1660
                        Name:             "root-mount",
8✔
1661
                        MountPath:        "/root",
8✔
1662
                        MountPropagation: ptr(corev1.MountPropagationHostToContainer),
8✔
1663
                },
8✔
1664
                {
8✔
1665
                        Name:      volumeName,
8✔
1666
                        MountPath: "/skyhook-package/node-metadata",
8✔
1667
                },
8✔
1668
        }
8✔
1669

8✔
1670
        if len(_package.ConfigMap) > 0 {
14✔
1671
                volumeMounts = append(volumeMounts, corev1.VolumeMount{
6✔
1672
                        Name:      _package.Name,
6✔
1673
                        MountPath: "/skyhook-package/configmaps",
6✔
1674
                })
6✔
1675

6✔
1676
                volumes = append(volumes, corev1.Volume{
6✔
1677
                        Name: _package.Name,
6✔
1678
                        VolumeSource: corev1.VolumeSource{
6✔
1679
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
1680
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
1681
                                                Name: strings.ToLower(fmt.Sprintf("%s-%s-%s", skyhook.Name, _package.Name, _package.Version)),
6✔
1682
                                        },
6✔
1683
                                },
6✔
1684
                        },
6✔
1685
                })
6✔
1686
        }
6✔
1687

1688
        copyDir := fmt.Sprintf("%s/%s/%s-%s-%s-%d",
8✔
1689
                opts.CopyDirRoot,
8✔
1690
                skyhook.Name,
8✔
1691
                _package.Name,
8✔
1692
                _package.Version,
8✔
1693
                skyhook.UID,
8✔
1694
                skyhook.Generation,
8✔
1695
        )
8✔
1696
        applyargs := []string{strings.ToLower(string(stage)), "/root", copyDir}
8✔
1697
        checkargs := []string{strings.ToLower(string(stage) + "-check"), "/root", copyDir}
8✔
1698

8✔
1699
        agentEnvs := append(
8✔
1700
                _package.Env,
8✔
1701
                getAgentConfigEnvVars(opts, _package.Name, _package.Version, skyhook.ResourceID(), skyhook.Name)...,
8✔
1702
        )
8✔
1703

8✔
1704
        pod := &corev1.Pod{
8✔
1705
                ObjectMeta: metav1.ObjectMeta{
8✔
1706
                        Name:      generateSafeName(63, skyhook.Name, _package.Name, _package.Version, string(stage), nodeName),
8✔
1707
                        Namespace: opts.Namespace,
8✔
1708
                        Labels: map[string]string{
8✔
1709
                                fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX):    skyhook.Name,
8✔
1710
                                fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX): fmt.Sprintf("%s-%s", _package.Name, _package.Version),
8✔
1711
                        },
8✔
1712
                },
8✔
1713
                Spec: corev1.PodSpec{
8✔
1714
                        NodeName:      nodeName,
8✔
1715
                        RestartPolicy: corev1.RestartPolicyOnFailure,
8✔
1716
                        InitContainers: []corev1.Container{
8✔
1717
                                {
8✔
1718
                                        Name:            fmt.Sprintf("%s-init", trunstr(_package.Name, 43)),
8✔
1719
                                        Image:           getPackageImage(_package),
8✔
1720
                                        ImagePullPolicy: "Always",
8✔
1721
                                        Command:         []string{"/bin/sh"},
8✔
1722
                                        Args: []string{
8✔
1723
                                                "-c",
8✔
1724
                                                "mkdir -p /root/${SKYHOOK_DIR} && cp -r /skyhook-package/* /root/${SKYHOOK_DIR}",
8✔
1725
                                        },
8✔
1726
                                        Env: []corev1.EnvVar{
8✔
1727
                                                {
8✔
1728
                                                        Name:  "SKYHOOK_DIR",
8✔
1729
                                                        Value: copyDir,
8✔
1730
                                                },
8✔
1731
                                        },
8✔
1732
                                        SecurityContext: &corev1.SecurityContext{
8✔
1733
                                                Privileged: ptr(true),
8✔
1734
                                        },
8✔
1735
                                        VolumeMounts: volumeMounts,
8✔
1736
                                },
8✔
1737
                                {
8✔
1738
                                        Name:            fmt.Sprintf("%s-%s", trunstr(_package.Name, 43), stage),
8✔
1739
                                        Image:           getAgentImage(opts, _package),
8✔
1740
                                        ImagePullPolicy: "Always",
8✔
1741
                                        Args:            applyargs,
8✔
1742
                                        Env:             agentEnvs,
8✔
1743
                                        SecurityContext: &corev1.SecurityContext{
8✔
1744
                                                Privileged: ptr(true),
8✔
1745
                                        },
8✔
1746
                                        VolumeMounts: volumeMounts,
8✔
1747
                                },
8✔
1748
                                {
8✔
1749
                                        Name:            fmt.Sprintf("%s-%scheck", trunstr(_package.Name, 43), stage),
8✔
1750
                                        Image:           getAgentImage(opts, _package),
8✔
1751
                                        ImagePullPolicy: "Always",
8✔
1752
                                        Args:            checkargs,
8✔
1753
                                        Env:             agentEnvs,
8✔
1754
                                        SecurityContext: &corev1.SecurityContext{
8✔
1755
                                                Privileged: ptr(true),
8✔
1756
                                        },
8✔
1757
                                        VolumeMounts: volumeMounts,
8✔
1758
                                },
8✔
1759
                        },
8✔
1760
                        Containers: []corev1.Container{
8✔
1761
                                {
8✔
1762
                                        Name:  "pause",
8✔
1763
                                        Image: opts.PauseImage,
8✔
1764
                                        Resources: corev1.ResourceRequirements{
8✔
1765
                                                Limits: corev1.ResourceList{
8✔
1766
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
8✔
1767
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
8✔
1768
                                                },
8✔
1769
                                                Requests: corev1.ResourceList{
8✔
1770
                                                        corev1.ResourceCPU:    resource.MustParse("100m"),
8✔
1771
                                                        corev1.ResourceMemory: resource.MustParse("20Mi"),
8✔
1772
                                                },
8✔
1773
                                        },
8✔
1774
                                },
8✔
1775
                        },
8✔
1776
                        ImagePullSecrets: []corev1.LocalObjectReference{
8✔
1777
                                {
8✔
1778
                                        Name: opts.ImagePullSecret,
8✔
1779
                                },
8✔
1780
                        },
8✔
1781
                        Volumes:     volumes,
8✔
1782
                        HostPID:     true,
8✔
1783
                        HostNetwork: true,
8✔
1784
                        // If you change these go change the SelectNode toleration in cluster_state.go
8✔
1785
                        Tolerations: append([]corev1.Toleration{ // tolerate all cordon
8✔
1786
                                {
8✔
1787
                                        Key:      TaintUnschedulable,
8✔
1788
                                        Operator: corev1.TolerationOpExists,
8✔
1789
                                },
8✔
1790
                                opts.GetRuntimeRequiredToleration(),
8✔
1791
                        }, skyhook.Spec.AdditionalTolerations...),
8✔
1792
                },
8✔
1793
        }
8✔
1794
        if _package.GracefulShutdown != nil {
13✔
1795
                pod.Spec.TerminationGracePeriodSeconds = ptr(int64(_package.GracefulShutdown.Duration.Seconds()))
5✔
1796
        }
5✔
1797
        setPodResources(pod, _package.Resources)
8✔
1798
        return pod
8✔
1799
}
1800

1801
// FilterEnv removes the environment variables passed into exlude
1802
func FilterEnv(envs []corev1.EnvVar, exclude ...string) []corev1.EnvVar {
8✔
1803
        var filteredEnv []corev1.EnvVar
8✔
1804

8✔
1805
        // build map of exclude strings for faster lookup
8✔
1806
        excludeMap := make(map[string]struct{})
8✔
1807
        for _, name := range exclude {
16✔
1808
                excludeMap[name] = struct{}{}
8✔
1809
        }
8✔
1810

1811
        // If the environment variable name is in the exclude list, skip it
1812
        // otherwise append it to the final list
1813
        for _, env := range envs {
16✔
1814
                if _, found := excludeMap[env.Name]; !found {
16✔
1815
                        filteredEnv = append(filteredEnv, env)
8✔
1816
                }
8✔
1817
        }
1818

1819
        return filteredEnv
8✔
1820
}
1821

1822
// PodMatchesPackage asserts that a given pod matches the given pod spec
1823
func podMatchesPackage(opts SkyhookOperatorOptions, _package *v1alpha1.Package, pod corev1.Pod, skyhook *wrapper.Skyhook, stage v1alpha1.Stage) bool {
8✔
1824
        var expectedPod *corev1.Pod
8✔
1825

8✔
1826
        // need to differentiate whether the pod is for an interrupt or not so we know
8✔
1827
        // what to expect and how to compare them
8✔
1828
        isInterrupt := false
8✔
1829
        _, limitRange := pod.Annotations["kubernetes.io/limit-ranger"]
8✔
1830

8✔
1831
        if pod.Labels[fmt.Sprintf("%s/interrupt", v1alpha1.METADATA_PREFIX)] == "True" {
14✔
1832
                expectedPod = createInterruptPodForPackage(opts, &v1alpha1.Interrupt{}, "", _package, skyhook, "")
6✔
1833
                isInterrupt = true
6✔
1834
        } else {
14✔
1835
                expectedPod = createPodFromPackage(opts, _package, skyhook, "", stage)
8✔
1836
        }
8✔
1837

1838
        actualPod := pod.DeepCopy()
8✔
1839

8✔
1840
        // check to see whether the name or the version of the package changed
8✔
1841
        packageLabel := fmt.Sprintf("%s/package", v1alpha1.METADATA_PREFIX)
8✔
1842
        if actualPod.Labels[packageLabel] != expectedPod.Labels[packageLabel] {
14✔
1843
                return false
6✔
1844
        }
6✔
1845

1846
        // compare initContainers since this is where a lot of the important info lives
1847
        for i := range actualPod.Spec.InitContainers {
16✔
1848
                expectedContainer := expectedPod.Spec.InitContainers[i]
8✔
1849
                actualContainer := actualPod.Spec.InitContainers[i]
8✔
1850

8✔
1851
                if expectedContainer.Name != actualContainer.Name {
9✔
1852
                        return false
1✔
1853
                }
1✔
1854

1855
                if expectedContainer.Image != actualContainer.Image {
8✔
1856
                        return false
×
1857
                }
×
1858

1859
                // compare the containers env vars except for the ones that are inserted
1860
                // by the operator by default as the SKYHOOK_RESOURCE_ID will change every
1861
                // time the skyhook is updated and would cause every pod to be removed
1862
                // TODO: This is ignoring all the static env vars that are set by operator config.
1863
                // It probably should be just SKYHOOK_RESOURCE_ID that is ignored. Otherwise,
1864
                // a user will have to manually delete the pod to update the package when operator is updated.
1865
                dummyAgentEnv := getAgentConfigEnvVars(opts, "", "", "", "")
8✔
1866
                excludedEnvs := make([]string, len(dummyAgentEnv))
8✔
1867
                for i, env := range dummyAgentEnv {
16✔
1868
                        excludedEnvs[i] = env.Name
8✔
1869
                }
8✔
1870
                expectedFilteredEnv := FilterEnv(expectedContainer.Env, excludedEnvs...)
8✔
1871
                actualFilteredEnv := FilterEnv(actualContainer.Env, excludedEnvs...)
8✔
1872
                if !reflect.DeepEqual(expectedFilteredEnv, actualFilteredEnv) {
14✔
1873
                        return false
6✔
1874
                }
6✔
1875

1876
                if !isInterrupt { // dont compare these since they are not configured on interrupt
16✔
1877
                        // compare resource requests and limits (CPU, memory, etc.)
8✔
1878
                        expectedResources := expectedContainer.Resources
8✔
1879
                        actualResources := actualContainer.Resources
8✔
1880
                        if skyhook.Spec.Packages[_package.Name].Resources != nil {
14✔
1881
                                // If CR has resources specified, they should match exactly
6✔
1882
                                if !reflect.DeepEqual(expectedResources, actualResources) {
7✔
1883
                                        return false
1✔
1884
                                }
1✔
1885
                        } else {
8✔
1886
                                // If CR has no resources specified, ensure pod has no resource overrides
8✔
1887
                                if !limitRange {
16✔
1888
                                        if actualResources.Requests != nil || actualResources.Limits != nil {
9✔
1889
                                                return false
1✔
1890
                                        }
1✔
1891
                                }
1892
                        }
1893
                }
1894
        }
1895

1896
        return true
8✔
1897
}
1898

1899
// ValidateRunningPackages deletes pods that don't match the current spec and checks if there are pods running
1900
// that don't match the node state and removes them if they exist
1901
func (r *SkyhookReconciler) ValidateRunningPackages(ctx context.Context, skyhook SkyhookNodes) (bool, error) {
7✔
1902

7✔
1903
        update := false
7✔
1904
        errs := make([]error, 0)
7✔
1905
        // get all pods for this skyhook packages
7✔
1906
        pods, err := r.dal.GetPods(ctx,
7✔
1907
                client.MatchingLabels{
7✔
1908
                        fmt.Sprintf("%s/name", v1alpha1.METADATA_PREFIX): skyhook.GetSkyhook().Name,
7✔
1909
                },
7✔
1910
        )
7✔
1911
        if err != nil {
7✔
1912
                return false, fmt.Errorf("error getting pods while validating packages: %w", err)
×
1913
        }
×
1914
        if pods == nil || len(pods.Items) == 0 {
14✔
1915
                return false, nil // nothing running for this skyhook on this node
7✔
1916
        }
7✔
1917

1918
        // Initialize metrics for each stage
1919
        stages := make(map[string]map[string]map[v1alpha1.Stage]int)
7✔
1920

7✔
1921
        // group pods by node
7✔
1922
        podsbyNode := make(map[string][]corev1.Pod)
7✔
1923
        for _, pod := range pods.Items {
14✔
1924
                podsbyNode[pod.Spec.NodeName] = append(podsbyNode[pod.Spec.NodeName], pod)
7✔
1925
        }
7✔
1926

1927
        for _, node := range skyhook.GetNodes() {
14✔
1928
                nodeState, err := node.State()
7✔
1929
                if err != nil {
7✔
1930
                        return false, fmt.Errorf("error getting node state: %w", err)
×
1931
                }
×
1932

1933
                for _, pod := range podsbyNode[node.GetNode().Name] {
14✔
1934
                        found := false
7✔
1935

7✔
1936
                        runningPackage, err := GetPackage(&pod)
7✔
1937
                        if err != nil {
7✔
1938
                                errs = append(errs, fmt.Errorf("error getting package from pod [%s:%s] while validating packages: %w", pod.Namespace, pod.Name, err))
×
1939
                        }
×
1940

1941
                        // check if the package is part of the skyhook spec, if not we need to delete it
1942
                        for _, v := range skyhook.GetSkyhook().Spec.Packages {
14✔
1943
                                if podMatchesPackage(r.opts, &v, pod, skyhook.GetSkyhook(), runningPackage.Stage) {
14✔
1944
                                        found = true
7✔
1945
                                }
7✔
1946
                        }
1947

1948
                        // Increment the stage count for metrics
1949
                        if _, ok := stages[runningPackage.Name]; !ok {
14✔
1950
                                stages[runningPackage.Name] = make(map[string]map[v1alpha1.Stage]int)
7✔
1951
                                if _, ok := stages[runningPackage.Name][runningPackage.Version]; !ok {
14✔
1952
                                        stages[runningPackage.Name][runningPackage.Version] = make(map[v1alpha1.Stage]int)
7✔
1953
                                        for _, stage := range v1alpha1.Stages {
14✔
1954
                                                stages[runningPackage.Name][runningPackage.Version][stage] = 0
7✔
1955
                                        }
7✔
1956
                                }
1957
                        }
1958
                        stages[runningPackage.Name][runningPackage.Version][runningPackage.Stage]++
7✔
1959

7✔
1960
                        // uninstall is by definition not part of the skyhook spec, so we cant delete it (because it used to be but was removed, hence uninstalling it)
7✔
1961
                        if runningPackage.Stage == v1alpha1.StageUninstall {
12✔
1962
                                found = true
5✔
1963
                        }
5✔
1964

1965
                        if !found {
12✔
1966
                                update = true
5✔
1967

5✔
1968
                                err := r.InvalidPackage(ctx, &pod)
5✔
1969
                                if err != nil {
9✔
1970
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
4✔
1971
                                }
4✔
1972
                                continue
5✔
1973
                        }
1974

1975
                        // Check if package exists in node state, ie a package running that the node state doesn't know about
1976
                        // something that is often done to try to fix bad node state is to clear the node state completely
1977
                        // which if a package is running, we want to terminate it gracefully. Ofthen what leads to this is
1978
                        // the package is in a crashloop and the operator want to restart it the whole package.
1979
                        // when we apply a package it just check if there is a running package on the node for the state of the package
1980
                        // this can cause to leave a pod running in say config mode, and it there is a depends on you might not correctly
1981
                        // run thins in the correct order.
1982
                        deleteMe := false
7✔
1983
                        packageStatus, exists := nodeState[runningPackage.GetUniqueName()]
7✔
1984
                        if !exists { // package not in node state, so we need to delete it
14✔
1985
                                deleteMe = true
7✔
1986
                        } else { // package in node state, so we need to check if it's running
14✔
1987
                                // need check if the stats match, if not we need to delete it
7✔
1988
                                if packageStatus.Stage != runningPackage.Stage {
13✔
1989
                                        deleteMe = true
6✔
1990
                                }
6✔
1991
                        }
1992

1993
                        if deleteMe {
14✔
1994
                                update = true
7✔
1995
                                err := r.InvalidPackage(ctx, &pod)
7✔
1996
                                if err != nil {
13✔
1997
                                        errs = append(errs, fmt.Errorf("error invalidating package: %w", err))
6✔
1998
                                }
6✔
1999
                        }
2000
                }
2001
        }
2002

2003
        return update, utilerrors.NewAggregate(errs)
7✔
2004
}
2005

2006
// InvalidPackage invalidates a package and updates the pod, which will trigger the pod to be deleted
2007
func (r *SkyhookReconciler) InvalidPackage(ctx context.Context, pod *corev1.Pod) error {
7✔
2008
        err := InvalidatePackage(pod)
7✔
2009
        if err != nil {
7✔
2010
                return fmt.Errorf("error invalidating package: %w", err)
×
2011
        }
×
2012

2013
        err = r.Update(ctx, pod)
7✔
2014
        if err != nil {
13✔
2015
                return fmt.Errorf("error updating pod: %w", err)
6✔
2016
        }
6✔
2017

2018
        return nil
7✔
2019
}
2020

2021
// ProcessInterrupt will check and do the interrupt if need, and returns
2022
// false means we are waiting
2023
// true means we are good to proceed
2024
func (r *SkyhookReconciler) ProcessInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, interrupt *v1alpha1.Interrupt, runInterrupt bool) (bool, error) {
7✔
2025

7✔
2026
        if !skyhookNode.HasInterrupt(*_package) {
14✔
2027
                return true, nil
7✔
2028
        }
7✔
2029

2030
        // default starting stage
2031
        stage := v1alpha1.StageApply
5✔
2032
        nextStage := skyhookNode.NextStage(_package)
5✔
2033
        if nextStage != nil {
10✔
2034
                stage = *nextStage
5✔
2035
        }
5✔
2036

2037
        // wait tell this is done if its happening
2038
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
5✔
2039
        if found && status.State == v1alpha1.StateSkipped {
10✔
2040
                return false, nil
5✔
2041
        }
5✔
2042

2043
        // Theres is a race condition when a node reboots and api cleans up the interrupt pod
2044
        // so we need to check if the pod exists and if it does, we need to recreate it
2045
        if status != nil && (status.State == v1alpha1.StateInProgress || status.State == v1alpha1.StateErroring) && status.Stage == v1alpha1.StageInterrupt {
10✔
2046
                // call interrupt to recreate the pod if missing
5✔
2047
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
5✔
2048
                if err != nil {
5✔
2049
                        return false, err
×
2050
                }
×
2051
        }
2052

2053
        // drain and cordon node before applying package that has an interrupt
2054
        if stage == v1alpha1.StageApply {
10✔
2055
                ready, err := r.EnsureNodeIsReadyForInterrupt(ctx, skyhookNode, _package)
5✔
2056
                if err != nil {
5✔
2057
                        return false, err
×
2058
                }
×
2059

2060
                if !ready {
10✔
2061
                        return false, nil
5✔
2062
                }
5✔
2063
        }
2064

2065
        // time to interrupt (once other packages have finished)
2066
        if stage == v1alpha1.StageInterrupt && runInterrupt {
10✔
2067
                err := r.Interrupt(ctx, skyhookNode, _package, interrupt)
5✔
2068
                if err != nil {
5✔
2069
                        return false, err
×
2070
                }
×
2071

2072
                return false, nil
5✔
2073
        }
2074

2075
        //skipping
2076
        if stage == v1alpha1.StageInterrupt && !runInterrupt {
10✔
2077
                err := skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateSkipped, stage, 0, _package.ContainerSHA)
5✔
2078
                if err != nil {
5✔
2079
                        return false, fmt.Errorf("error upserting to skip interrupt: %w", err)
×
2080
                }
×
2081
                return false, nil
5✔
2082
        }
2083

2084
        // wait tell this is done if its happening
2085
        if status != nil && status.Stage == v1alpha1.StageInterrupt && status.State != v1alpha1.StateComplete {
10✔
2086
                return false, nil
5✔
2087
        }
5✔
2088

2089
        return true, nil
5✔
2090
}
2091

2092
func (r *SkyhookReconciler) EnsureNodeIsReadyForInterrupt(ctx context.Context, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package) (bool, error) {
5✔
2093
        // cordon node
5✔
2094
        skyhookNode.Cordon()
5✔
2095

5✔
2096
        hasWork, err := r.HasNonInterruptWork(ctx, skyhookNode)
5✔
2097
        if err != nil {
5✔
2098
                return false, err
×
2099
        }
×
2100
        if hasWork { // keep waiting...
10✔
2101
                return false, nil
5✔
2102
        }
5✔
2103

2104
        ready, err := r.DrainNode(ctx, skyhookNode, _package)
5✔
2105
        if err != nil {
5✔
2106
                return false, fmt.Errorf("error draining node [%s]: %w", skyhookNode.GetNode().Name, err)
×
2107
        }
×
2108

2109
        return ready, nil
5✔
2110
}
2111

2112
// ApplyPackage starts a pod on node for the package
2113
func (r *SkyhookReconciler) ApplyPackage(ctx context.Context, logger logr.Logger, clusterState *clusterState, skyhookNode wrapper.SkyhookNode, _package *v1alpha1.Package, runInterrupt bool) error {
7✔
2114

7✔
2115
        if _package == nil {
7✔
2116
                return errors.New("can not apply nil package")
×
2117
        }
×
2118

2119
        // default starting stage
2120
        stage := v1alpha1.StageApply
7✔
2121

7✔
2122
        // These modes don't have anything that comes before them so we must specify them as the
7✔
2123
        // starting point. The next stage function will return nil until these modes complete.
7✔
2124
        // Config is a special case as sometimes apply will come before it and other times it wont
7✔
2125
        // which is why it needs to be here as well
7✔
2126
        if packageStatus, found := skyhookNode.PackageStatus(_package.GetUniqueName()); found {
14✔
2127
                switch packageStatus.Stage {
7✔
2128
                case v1alpha1.StageConfig, v1alpha1.StageUpgrade, v1alpha1.StageUninstall:
7✔
2129
                        stage = packageStatus.Stage
7✔
2130
                }
2131
        }
2132

2133
        // if stage != v1alpha1.StageApply {
2134
        //         // If a node gets rest by a user, the about method will return the wrong node state. Above sources it from the skyhook status.
2135
        //         // check if the node has nothing, reset it then apply the package.
2136
        //         nodeState, err := skyhookNode.State()
2137
        //         if err != nil {
2138
        //                 return fmt.Errorf("error getting node state: %w", err)
2139
        //         }
2140

2141
        //         _, found := nodeState[_package.GetUniqueName()]
2142
        //         if !found {
2143
        //                 stage = v1alpha1.StageApply
2144
        //         }
2145
        // }
2146

2147
        nextStage := skyhookNode.NextStage(_package)
7✔
2148
        if nextStage != nil {
14✔
2149
                stage = *nextStage
7✔
2150
        }
7✔
2151

2152
        // test if pod exists, if so, bailout
2153
        exists, err := r.PodExists(ctx, skyhookNode.GetNode().Name, skyhookNode.GetSkyhook().Name, _package)
7✔
2154
        if err != nil {
7✔
2155
                return err
×
2156
        }
×
2157

2158
        // wait tell this is done if its happening
2159
        status, found := skyhookNode.PackageStatus(_package.GetUniqueName())
7✔
2160

7✔
2161
        if found && status.State == v1alpha1.StateSkipped { // skipped, so nothing to do
7✔
2162
                return nil
×
2163
        }
×
2164

2165
        if found && status.State == v1alpha1.StateInProgress { // running, so do nothing atm
14✔
2166
                if exists {
14✔
2167
                        return nil
7✔
2168
                }
7✔
2169
        }
2170

2171
        if exists {
14✔
2172
                // nothing to do here, already running
7✔
2173
                return nil
7✔
2174
        }
7✔
2175

2176
        pod := createPodFromPackage(r.opts, _package, skyhookNode.GetSkyhook(), skyhookNode.GetNode().Name, stage)
7✔
2177

7✔
2178
        if err := SetPackages(pod, skyhookNode.GetSkyhook().Skyhook, _package.Image, stage, _package); err != nil {
7✔
2179
                return fmt.Errorf("error setting package on pod: %w", err)
×
2180
        }
×
2181

2182
        // setup ownership of the pod we created
2183
        // helps run time know what to do when something happens to this pod we are about to create
2184
        if err := ctrl.SetControllerReference(skyhookNode.GetSkyhook().Skyhook, pod, r.scheme); err != nil {
7✔
2185
                return fmt.Errorf("error setting ownership: %w", err)
×
2186
        }
×
2187

2188
        if err := r.Create(ctx, pod); err != nil {
7✔
2189
                return fmt.Errorf("error creating pod: %w", err)
×
2190
        }
×
2191

2192
        if err = skyhookNode.Upsert(_package.PackageRef, _package.Image, v1alpha1.StateInProgress, stage, 0, _package.ContainerSHA); err != nil {
7✔
2193
                err = fmt.Errorf("error upserting package: %w", err) // want to keep going in this case, but don't want to lose the err
×
2194
        }
×
2195

2196
        skyhookNode.SetStatus(v1alpha1.StatusInProgress)
7✔
2197

7✔
2198
        skyhookNode.GetSkyhook().AddCondition(metav1.Condition{
7✔
2199
                Type:               fmt.Sprintf("%s/ApplyPackage", v1alpha1.METADATA_PREFIX),
7✔
2200
                Status:             metav1.ConditionTrue,
7✔
2201
                ObservedGeneration: skyhookNode.GetSkyhook().Generation,
7✔
2202
                LastTransitionTime: metav1.Now(),
7✔
2203
                Reason:             "ApplyPackage",
7✔
2204
                Message:            fmt.Sprintf("Applying package [%s:%s] to node [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name),
7✔
2205
        })
7✔
2206

7✔
2207
        r.recorder.Eventf(skyhookNode.GetNode(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] from [skyhook:%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetSkyhook().Name, stage)
7✔
2208
        r.recorder.Eventf(skyhookNode.GetSkyhook(), EventTypeNormal, EventsReasonSkyhookApply, "Applying package [%s:%s] to node [%s] stage [%s]", _package.Name, _package.Version, skyhookNode.GetNode().Name, stage)
7✔
2209

7✔
2210
        skyhookNode.GetSkyhook().Updated = true
7✔
2211

7✔
2212
        return err
7✔
2213
}
2214

2215
// HandleRuntimeRequired finds any nodes for which all runtime required Skyhooks are complete and remove their runtime required taint
2216
// Will return an error if the patching of the nodes is not possible
2217
func (r *SkyhookReconciler) HandleRuntimeRequired(ctx context.Context, clusterState *clusterState) error {
7✔
2218
        node_to_skyhooks, skyhook_node_map := groupSkyhooksByNode(clusterState)
7✔
2219
        to_remove := getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks, skyhook_node_map)
7✔
2220
        // Remove the runtime required taint from nodes in to_remove
7✔
2221
        taint_to_remove := r.opts.GetRuntimeRequiredTaint()
7✔
2222
        errs := make([]error, 0)
7✔
2223
        for _, node := range to_remove {
12✔
2224
                // check before removing taint that it even exists to begin with
5✔
2225
                if !taints.TaintExists(node.Spec.Taints, &taint_to_remove) {
10✔
2226
                        continue
5✔
2227
                }
2228
                // RemoveTaint will ALWAYS return nil for its error so no need to check it
2229
                new_node, updated, _ := taints.RemoveTaint(node, &taint_to_remove)
5✔
2230
                if updated {
10✔
2231
                        err := r.Patch(ctx, new_node, client.MergeFrom(node))
5✔
2232
                        if err != nil {
5✔
2233
                                errs = append(errs, err)
×
2234
                        }
×
2235
                }
2236
        }
2237
        if len(errs) > 0 {
7✔
2238
                return utilerrors.NewAggregate(errs)
×
2239
        }
×
2240
        return nil
7✔
2241
}
2242

2243
// Group Skyhooks by what node they target
2244
func groupSkyhooksByNode(clusterState *clusterState) (map[types.UID][]SkyhookNodes, map[types.UID]*corev1.Node) {
8✔
2245
        node_to_skyhooks := make(map[types.UID][]SkyhookNodes)
8✔
2246
        nodes := make(map[types.UID]*corev1.Node)
8✔
2247
        for _, skyhook := range clusterState.skyhooks {
16✔
2248
                // Ignore skyhooks that don't have runtime required
8✔
2249
                if !skyhook.GetSkyhook().Spec.RuntimeRequired {
16✔
2250
                        continue
8✔
2251
                }
2252
                for _, node := range skyhook.GetNodes() {
12✔
2253
                        if _, ok := node_to_skyhooks[node.GetNode().UID]; !ok {
12✔
2254
                                node_to_skyhooks[node.GetNode().UID] = make([]SkyhookNodes, 0)
6✔
2255
                                nodes[node.GetNode().UID] = node.GetNode()
6✔
2256
                        }
6✔
2257
                        node_to_skyhooks[node.GetNode().UID] = append(node_to_skyhooks[node.GetNode().UID], skyhook)
6✔
2258
                }
2259

2260
        }
2261
        return node_to_skyhooks, nodes
8✔
2262
}
2263

2264
// Get the nodes to remove runtime required taint from node that all skyhooks targeting that node have completed
2265
func getRuntimeRequiredTaintCompleteNodes(node_to_skyhooks map[types.UID][]SkyhookNodes, nodes map[types.UID]*corev1.Node) []*corev1.Node {
8✔
2266
        to_remove := make([]*corev1.Node, 0)
8✔
2267
        for node_uid, skyhooks := range node_to_skyhooks {
14✔
2268
                all_complete := true
6✔
2269
                for _, skyhook := range skyhooks {
12✔
2270
                        if !skyhook.IsComplete() {
12✔
2271
                                all_complete = false
6✔
2272
                                break
6✔
2273
                        }
2274
                }
2275
                if all_complete {
12✔
2276
                        to_remove = append(to_remove, nodes[node_uid])
6✔
2277
                }
6✔
2278
        }
2279
        return to_remove
8✔
2280
}
2281

2282
// setPodResources sets resources for all containers and init containers in the pod if override is set, else leaves empty for LimitRange
2283
func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) {
8✔
2284
        if res == nil {
16✔
2285
                return
8✔
2286
        }
8✔
2287
        if !res.CPURequest.IsZero() || !res.CPULimit.IsZero() || !res.MemoryRequest.IsZero() || !res.MemoryLimit.IsZero() {
12✔
2288
                for i := range pod.Spec.InitContainers {
12✔
2289
                        pod.Spec.InitContainers[i].Resources = corev1.ResourceRequirements{
6✔
2290
                                Limits: corev1.ResourceList{
6✔
2291
                                        corev1.ResourceCPU:    res.CPULimit,
6✔
2292
                                        corev1.ResourceMemory: res.MemoryLimit,
6✔
2293
                                },
6✔
2294
                                Requests: corev1.ResourceList{
6✔
2295
                                        corev1.ResourceCPU:    res.CPURequest,
6✔
2296
                                        corev1.ResourceMemory: res.MemoryRequest,
6✔
2297
                                },
6✔
2298
                        }
6✔
2299
                }
6✔
2300
        }
2301
}
2302

2303
// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies.
2304
func partitionNodesIntoCompartments(clusterState *clusterState) error {
8✔
2305
        for _, skyhook := range clusterState.skyhooks {
16✔
2306
                // Skip skyhooks without a deployment policy (they use the default compartment created in BuildState)
8✔
2307
                if skyhook.GetSkyhook().Spec.DeploymentPolicy == "" {
16✔
2308
                        continue
8✔
2309
                }
2310

2311
                // Skip if no compartments exist (e.g., deployment policy not found)
2312
                // The webhook should prevent this at admission time, and the controller sets a condition at runtime,
2313
                // but we guard here to prevent panics if the policy goes missing
2314
                if len(skyhook.GetCompartments()) == 0 {
3✔
2315
                        continue
1✔
2316
                }
2317

2318
                // Clear all compartments before reassigning nodes to prevent stale nodes
2319
                // This ensures nodes are only in their current compartment based on current labels
2320
                for _, compartment := range skyhook.GetCompartments() {
4✔
2321
                        compartment.ClearNodes()
2✔
2322
                }
2✔
2323

2324
                for _, node := range skyhook.GetNodes() {
4✔
2325
                        compartmentName, err := skyhook.AssignNodeToCompartment(node)
2✔
2326
                        if err != nil {
2✔
2327
                                return fmt.Errorf("error assigning node %s: %w", node.GetNode().Name, err)
×
2328
                        }
×
2329
                        if err := skyhook.AddCompartmentNode(compartmentName, node); err != nil {
2✔
2330
                                return fmt.Errorf("error adding node %s to compartment %s: %w", node.GetNode().Name, compartmentName, err)
×
2331
                        }
×
2332
                }
2333
        }
2334

2335
        return nil
8✔
2336
}
2337

2338
// validateAndUpsertSkyhookData performs validation and configmap operations for a skyhook
2339
func (r *SkyhookReconciler) validateAndUpsertSkyhookData(ctx context.Context, skyhook SkyhookNodes, clusterState *clusterState) (bool, ctrl.Result, error) {
7✔
2340
        if yes, result, err := shouldReturn(r.ValidateRunningPackages(ctx, skyhook)); yes {
14✔
2341
                return yes, result, err
7✔
2342
        }
7✔
2343

2344
        if yes, result, err := shouldReturn(r.ValidateNodeConfigmaps(ctx, skyhook.GetSkyhook().Name, skyhook.GetNodes())); yes {
12✔
2345
                return yes, result, err
5✔
2346
        }
5✔
2347

2348
        if yes, result, err := shouldReturn(r.UpsertConfigmaps(ctx, skyhook, clusterState)); yes {
12✔
2349
                return yes, result, err
5✔
2350
        }
5✔
2351

2352
        return false, ctrl.Result{}, nil
7✔
2353
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc