• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NVIDIA / skyhook / 22809174246

07 Mar 2026 11:06PM UTC coverage: 80.943%. First build
22809174246

Pull #176

github

web-flow
Merge 43ddb1caf into f9b9e8b99
Pull Request #176: feat: add sequencing: node or all

58 of 64 new or added lines in 3 files covered. (90.63%)

6902 of 8527 relevant lines covered (80.94%)

26.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.32
/operator/internal/wrapper/node.go
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
 * SPDX-License-Identifier: Apache-2.0
4
 *
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
package wrapper
20

21
import (
22
        "encoding/json"
23
        "fmt"
24
        "sort"
25
        "strings"
26

27
        "github.com/NVIDIA/skyhook/operator/api/v1alpha1"
28
        "github.com/NVIDIA/skyhook/operator/internal/graph"
29
        "github.com/NVIDIA/skyhook/operator/internal/version"
30
        "github.com/go-logr/logr"
31
        corev1 "k8s.io/api/core/v1"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
)
34

35
// there are 2 interface to reflect functions that need a skyhook and node
36
// and ones that just need a node
37

38
// SkyhookNode wraps a node with a supporting skyhook
39
type SkyhookNode interface {
40
        SkyhookNodeOnly
41
        GetSkyhook() *Skyhook
42
        GetComplete() []string
43
        SetStatus(status v1alpha1.Status)
44
        IsComplete() bool
45
        ProgressSkipped()
46
        IsPackageComplete(_package v1alpha1.Package) bool
47
        RunNext() ([]*v1alpha1.Package, error)
48
        NextStage(_package *v1alpha1.Package) *v1alpha1.Stage
49
        HasInterrupt(_package v1alpha1.Package) bool
50
        UpdateCondition()
51
        HasSkyhookAnnotations() bool
52
}
53

54
// SkyhookNodeOnly wraps the node with just a skyhook name
55
type SkyhookNodeOnly interface {
56
        Status() v1alpha1.Status
57
        // SetStatus is in both interfaces, does more if skyhook is not nil
58
        SetStatus(status v1alpha1.Status)
59
        PackageStatus(name string) (*v1alpha1.PackageStatus, bool)
60
        SetVersion()
61
        GetVersion() string
62
        Migrate(logger logr.Logger) error
63
        State() (v1alpha1.NodeState, error)
64
        SetState(state v1alpha1.NodeState) error
65
        RemoveState(_package v1alpha1.PackageRef) error
66
        Upsert(_package v1alpha1.PackageRef, image string, state v1alpha1.State, stage v1alpha1.Stage, restarts int32, containerSHA string) error
67
        GetNode() *corev1.Node
68
        Taint(key string)
69
        RemoveTaint(key string)
70
        Cordon()
71
        Uncordon()
72
        Reset()
73
        Changed() bool
74
}
75

76
var _ SkyhookNode = &skyhookNode{}
77

78
// NewSkyhookNodeOnly most of use cases for the wrapper just needs name, so this stub is for making helpers for those use cases,
79
// should help reduce calls to api, and not leak stubbed skyhooks with just name set
80
func NewSkyhookNodeOnly(node *corev1.Node, skyhookName string) (SkyhookNodeOnly, error) {
49✔
81
        ret := &skyhookNode{
49✔
82
                Node:        node,
49✔
83
                skyhookName: skyhookName,
49✔
84
        }
49✔
85
        state, err := ret.State()
49✔
86
        if err != nil {
49✔
NEW
87
                return nil, fmt.Errorf("error creating skyhookNode: %w", err)
×
88
        }
×
89
        ret.nodeState = state
49✔
90
        return ret, nil
49✔
91
}
92

93
// Convert will upgrade this to be the full interface if you have a skyhook
94
func Convert(node SkyhookNodeOnly, skyhook *v1alpha1.Skyhook) (SkyhookNode, error) {
49✔
95
        ret := node.(*skyhookNode)
49✔
96
        ret.skyhook = &Skyhook{Skyhook: skyhook}
49✔
97

49✔
98
        graph, err := skyhook.Spec.BuildGraph()
49✔
99
        if err != nil {
49✔
100
                return nil, err
×
NEW
101
        }
×
102

103
        ret.graph = graph
49✔
104

49✔
105
        return ret, nil
49✔
106
}
107

108
func NewSkyhookNode(node *corev1.Node, skyhook *v1alpha1.Skyhook) (SkyhookNode, error) {
49✔
109

49✔
110
        t, err := NewSkyhookNodeOnly(node, skyhook.Name)
49✔
111
        if err != nil {
49✔
112
                return nil, err
×
113
        }
×
114

115
        return Convert(t, skyhook)
49✔
116
}
117

118
type skyhookNode struct {
119
        *corev1.Node
120
        skyhookName string
121
        skyhook     *Skyhook
122
        nodeState   v1alpha1.NodeState
123
        graph       graph.DependencyGraph[*v1alpha1.Package]
124
        updated     bool
125
}
126

127
// GetSkyhook implements sskyhookNode.
128
func (node *skyhookNode) GetSkyhook() *Skyhook {
42✔
129
        return node.skyhook
42✔
130
}
42✔
131

132
// GetNode implements sskyhookNode.
133
func (node *skyhookNode) GetNode() *corev1.Node {
42✔
134
        return node.Node
42✔
135
}
42✔
136

137
func (node *skyhookNode) SetStatus(status v1alpha1.Status) {
42✔
138

42✔
139
        s, ok := node.Annotations[fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
42✔
140
        if !ok || s != string(status) {
84✔
141
                if node.Annotations == nil {
42✔
142
                        node.Annotations = make(map[string]string)
×
143
                }
×
144
                if node.Labels == nil {
42✔
145
                        node.Labels = make(map[string]string)
×
146
                }
×
147
                node.updated = true
42✔
148
                node.Annotations[fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] = string(status)
42✔
149
                node.Labels[fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] = string(status)
42✔
150
        }
151

152
        if status == v1alpha1.StatusComplete {
84✔
153
                node.Uncordon()
42✔
154
        }
42✔
155

156
        if node.skyhook != nil {
84✔
157
                node.skyhook.SetNodeStatus(node.Node.Name, status)
42✔
158
                node.skyhook.SetNodeState(node.Node.Name, node.nodeState)
42✔
159
        }
42✔
160
}
161

162
func (node *skyhookNode) Status() v1alpha1.Status {
42✔
163
        status, ok := node.Annotations[fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
42✔
164
        if !ok {
84✔
165
                return v1alpha1.StatusUnknown
42✔
166
        }
42✔
167
        return v1alpha1.GetStatus(status)
42✔
168
}
169

170
func (node *skyhookNode) State() (v1alpha1.NodeState, error) {
49✔
171

49✔
172
        if node.nodeState != nil {
91✔
173
                return node.nodeState, nil
42✔
174
        }
42✔
175

176
        if node == nil {
49✔
177
                return nil, nil
×
178
        }
×
179
        s, ok := node.Annotations[fmt.Sprintf("%s/nodeState_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
49✔
180
        if !ok {
98✔
181
                return nil, nil
49✔
182
        }
49✔
183

184
        ret := v1alpha1.NodeState{}
42✔
185
        err := json.Unmarshal([]byte(s), &ret)
42✔
186
        if err != nil {
42✔
187
                return nil, fmt.Errorf("error unmarshalling node state: %w", err)
×
188
        }
×
189

190
        return ret, nil
42✔
191
}
192

193
func (node *skyhookNode) PackageStatus(name string) (*v1alpha1.PackageStatus, bool) {
42✔
194
        packageStatus := node.nodeState.Get(name)
42✔
195
        if packageStatus != nil {
84✔
196
                return packageStatus, true
42✔
197
        }
42✔
198

199
        return nil, false
42✔
200
}
201

202
func (node *skyhookNode) SetVersion() {
42✔
203

42✔
204
        current := node.GetVersion()
42✔
205
        if current == version.VERSION { // if has not changed, do nothing and not set updated
42✔
206
                return
×
207
        }
×
208

209
        if version.VERSION == "" { // was not compiled with version, so do nothing
42✔
210
                return
×
211
        }
×
212

213
        if node.Annotations == nil {
42✔
214
                node.Annotations = map[string]string{}
×
215
        }
×
216

217
        node.Annotations[fmt.Sprintf("%s/version_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] = version.VERSION
42✔
218
        node.updated = true
42✔
219
}
220

221
func (node *skyhookNode) GetVersion() string {
42✔
222
        version, ok := node.Annotations[fmt.Sprintf("%s/version_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
42✔
223
        if !ok {
84✔
224
                return ""
42✔
225
        }
42✔
226
        return version
42✔
227
}
228

229
func (node *skyhookNode) Migrate(logger logr.Logger) error {
42✔
230

42✔
231
        from := node.GetVersion()
42✔
232
        to := version.VERSION
42✔
233

42✔
234
        if from == to { // already migrated
42✔
235
                return nil
×
236
        }
×
237

238
        mm := version.MajorMinor(from)
42✔
239
        switch mm {
42✔
240
        // because there was a bug in versioning, this same migration needs to be run for more then just the v0.5 releases
241
        // empty string is for before versioning was added
242
        case "", "v0.5", "v0.6", "v0.7":
42✔
243
                err := migrateNodeTo_0_5_0(node, logger)
42✔
244
                if err != nil {
42✔
245
                        return err
×
246
                }
×
247
                node.SetVersion()
42✔
248
                return nil
42✔
249
        }
250

251
        return nil
×
252
}
253

254
func (node *skyhookNode) SetState(state v1alpha1.NodeState) error {
49✔
255
        if node == nil || state == nil {
49✔
256
                return nil
×
257
        }
×
258

259
        data, err := json.Marshal(state)
49✔
260
        if err != nil {
49✔
261
                return fmt.Errorf("error marshalling node state: %w", err)
×
262
        }
×
263

264
        if node.Annotations == nil {
56✔
265
                node.Annotations = map[string]string{}
7✔
266
        }
7✔
267

268
        s, ok := node.Annotations[fmt.Sprintf("%s/nodeState_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
49✔
269
        if !ok || s != string(data) {
98✔
270
                node.Annotations[fmt.Sprintf("%s/nodeState_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] = string(data)
49✔
271
                node.nodeState = state
49✔
272
                node.updated = true
49✔
273
        }
49✔
274

275
        return nil
49✔
276
}
277

278
func (node *skyhookNode) RemoveState(_package v1alpha1.PackageRef) error {
28✔
279
        changed := node.nodeState.RemoveState(_package)
28✔
280
        if changed {
56✔
281
                return node.SetState(node.nodeState)
28✔
282
        }
28✔
283

284
        return nil
×
285
}
286

287
func (node *skyhookNode) Upsert(_package v1alpha1.PackageRef, image string, state v1alpha1.State, stage v1alpha1.Stage, restarts int32, containerSHA string) error {
49✔
288
        changed := node.nodeState.Upsert(_package, image, state, stage, restarts, containerSHA)
49✔
289
        if changed {
98✔
290
                if node.skyhook != nil {
98✔
291
                        node.skyhook.Updated = true
49✔
292
                }
49✔
293

294
                return node.SetState(node.nodeState)
49✔
295
        }
296
        return nil
35✔
297
}
298

299
func (node *skyhookNode) IsPackageComplete(_package v1alpha1.Package) bool {
28✔
300
        return node.nodeState.IsPackageComplete(_package, node.skyhook.GetConfigInterrupts(), node.skyhook.GetConfigUpdates())
28✔
301
}
28✔
302

303
func (node *skyhookNode) IsComplete() bool {
42✔
304
        return node.nodeState.IsComplete(node.skyhook.Spec.Packages, node.skyhook.GetConfigInterrupts(), node.skyhook.GetConfigUpdates())
42✔
305
}
42✔
306

307
func (node *skyhookNode) GetComplete() []string {
49✔
308
        return node.nodeState.GetComplete(node.skyhook.Spec.Packages, node.skyhook.GetConfigInterrupts(), node.skyhook.GetConfigUpdates())
49✔
309
}
49✔
310

311
func (node *skyhookNode) ProgressSkipped() {
28✔
312
        if node.nodeState.ProgressSkipped(node.skyhook.Spec.Packages, node.skyhook.GetConfigInterrupts(), node.skyhook.GetConfigUpdates()) {
56✔
313
                node.skyhook.Updated = true
28✔
314
                node.updated = true
28✔
315
        }
28✔
316
}
317

318
func (node *skyhookNode) RunNext() ([]*v1alpha1.Package, error) {
49✔
319
        complete := node.GetComplete()
49✔
320

49✔
321
        // Get next available nodes based on completed dependencies
49✔
322
        next, err := node.graph.Next(complete...)
49✔
323
        if err != nil {
49✔
324
                return nil, err
×
325
        }
×
326

327
        toRun := node.graph.Get(next...)
49✔
328

49✔
329
        // Sort for deterministic ordering
49✔
330
        sort.Slice(toRun, func(i, j int) bool {
84✔
331
                return toRun[i].Name < toRun[j].Name
35✔
332
        })
35✔
333

334
        return toRun, nil
49✔
335
}
336

337
func (node *skyhookNode) NextStage(_package *v1alpha1.Package) *v1alpha1.Stage {
42✔
338
        return node.nodeState.NextStage(_package, node.skyhook.GetConfigInterrupts(), node.skyhook.GetConfigUpdates())
42✔
339
}
42✔
340

341
func (node *skyhookNode) Changed() bool {
42✔
342
        return node.updated
42✔
343
}
42✔
344

345
func (node *skyhookNode) HasInterrupt(_package v1alpha1.Package) bool {
42✔
346
        return node.nodeState.HasInterrupt(_package, node.skyhook.GetConfigInterrupts(), node.skyhook.GetConfigUpdates())
42✔
347
}
42✔
348

349
func (node *skyhookNode) Taint(key string) {
×
350

×
NEW
351
        // dont add it if it exists already, dups will error
×
352
        for _, t := range node.Spec.Taints {
×
353
                if t.Key == key {
×
354
                        return
×
355
                }
×
356
        }
357

358
        if node.Spec.Taints == nil {
×
359
                node.Spec.Taints = make([]corev1.Taint, 0)
×
360
        }
×
361

362
        node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
×
363
                Key:    key,
×
NEW
364
                Value:  node.GetSkyhook().Name,
×
365
                Effect: corev1.TaintEffectNoSchedule,
×
366
        })
×
367
        node.updated = true
×
368
}
369

370
func (node *skyhookNode) RemoveTaint(key string) {
×
371

×
372
        if len(node.Spec.Taints) == 0 {
×
373
                return
×
374
        }
×
375

376
        temp := node.Spec.Taints[:0]
×
377
        for _, t := range node.Spec.Taints {
×
378
                if t.Key != key {
×
379
                        temp = append(temp, t)
×
380
                }
×
381
        }
382

383
        if len(temp) < len(node.Spec.Taints) {
×
NEW
384
                node.Spec.Taints = temp
×
385
                node.updated = true
×
386
        }
×
387
}
388

389
// HasSkyhookAnnotations returns true if the node has any annotation with the
390
// skyhook.nvidia.com/ prefix, indicating it has been previously touched by the Skyhook operator.
391
func (node *skyhookNode) HasSkyhookAnnotations() bool {
35✔
392
        for key := range node.Annotations {
70✔
393
                if strings.HasPrefix(key, v1alpha1.METADATA_PREFIX+"/") {
70✔
394
                        return true
35✔
395
                }
35✔
396
        }
397
        return false
35✔
398
}
399

400
func (node *skyhookNode) Cordon() {
28✔
401
        _, ok := node.Annotations[fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
28✔
402
        if !node.Spec.Unschedulable || !ok {
56✔
403
                node.Spec.Unschedulable = true
28✔
404
                node.Annotations[fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] = "true"
28✔
405
                node.updated = true
28✔
406
        }
28✔
407
}
408

409
func (node *skyhookNode) Uncordon() {
42✔
410

42✔
411
        // if we hold a cordon remove it, also we dont want to remove a cordon if we dont have one...
42✔
412
        _, ok := node.Annotations[fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)]
42✔
413
        if ok {
70✔
414
                node.Spec.Unschedulable = false
28✔
415
                delete(node.Annotations, fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhookName))
28✔
416
                node.updated = true
28✔
417
        }
28✔
418
}
419

420
func (node *skyhookNode) Reset() {
×
NEW
421

×
422
        delete(node.skyhook.Status.NodeState, node.Name)
×
423
        delete(node.skyhook.Status.NodeStatus, node.Name)
×
424
        node.skyhook.Status.Status = v1alpha1.StatusUnknown
×
425
        node.skyhook.Updated = true
×
426

×
427
        delete(node.Annotations, fmt.Sprintf("%s/cordon_", v1alpha1.METADATA_PREFIX))
×
428
        delete(node.Annotations, fmt.Sprintf("%s/nodeState_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name))
×
429
        delete(node.Annotations, fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name))
×
430

×
431
        delete(node.Labels, fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name))
×
432
        node.updated = true
×
433
}
×
434

435
func (node *skyhookNode) UpdateCondition() {
42✔
436
        readyReason, errorReason := "Incomplete", "Not Erroring"
42✔
437
        errorCondFound, condFound := false, false
42✔
438

42✔
439
        if node.Node.Status.Conditions == nil {
42✔
440
                node.Node.Status.Conditions = make([]corev1.NodeCondition, 0)
×
441
        }
×
442

443
        errorStatus, condStatus := corev1.ConditionFalse, corev1.ConditionTrue
42✔
444
        if node.IsComplete() {
84✔
445
                readyReason = "Complete"
42✔
446
                condStatus = corev1.ConditionFalse
42✔
447
        }
42✔
448

449
        for _, packageStatus := range node.nodeState {
84✔
450
                switch packageStatus.State {
42✔
451
                case v1alpha1.StateErroring, v1alpha1.StateUnknown:
28✔
452
                        errorReason = "Package(s) Erroring or Unknown"
28✔
453
                        errorStatus = corev1.ConditionTrue
28✔
454
                }
455
        }
456

457
        cond := corev1.NodeCondition{
42✔
458
                Type:               corev1.NodeConditionType(fmt.Sprintf("%s/%s/NotReady", v1alpha1.METADATA_PREFIX, node.skyhookName)),
42✔
459
                Status:             condStatus,
42✔
460
                LastHeartbeatTime:  metav1.Now(),
42✔
461
                LastTransitionTime: metav1.Now(),
42✔
462
                Reason:             readyReason,
42✔
463
                Message:            fmt.Sprintf("Skyhook %s Ready", node.skyhookName),
42✔
464
        }
42✔
465

42✔
466
        errorCond := corev1.NodeCondition{
42✔
467
                Type:               corev1.NodeConditionType(fmt.Sprintf("%s/%s/Erroring", v1alpha1.METADATA_PREFIX, node.skyhookName)),
42✔
468
                Status:             errorStatus,
42✔
469
                LastHeartbeatTime:  metav1.Now(),
42✔
470
                LastTransitionTime: metav1.Now(),
42✔
471
                Reason:             errorReason,
42✔
472
                Message:            fmt.Sprintf("Package Erroring or Unknown for %s", node.skyhookName),
42✔
473
        }
42✔
474

42✔
475
        for i, condition := range node.Node.Status.Conditions {
84✔
476
                switch condition.Type {
42✔
477
                case errorCond.Type:
42✔
478
                        errorCondFound = true
42✔
479
                        if condition.Reason != errorCond.Reason && condition.Message == errorCond.Message {
70✔
480
                                node.Node.Status.Conditions[i] = errorCond // update it with the new condition
28✔
481
                                node.updated = true
28✔
482
                        }
28✔
483
                case cond.Type:
42✔
484
                        condFound = true
42✔
485
                        if condition.Reason != cond.Reason && condition.Message == cond.Message {
84✔
486
                                node.Node.Status.Conditions[i] = cond // update it with the new condition
42✔
487
                                node.updated = true
42✔
488
                        }
42✔
489
                }
490
        }
491

492
        if !errorCondFound {
84✔
493
                node.Node.Status.Conditions = append([]corev1.NodeCondition{errorCond}, node.Node.Status.Conditions...)
42✔
494
                node.updated = true
42✔
495
        }
42✔
496
        if !condFound {
84✔
497
                node.Node.Status.Conditions = append([]corev1.NodeCondition{cond}, node.Node.Status.Conditions...)
42✔
498
                node.updated = true
42✔
499
        }
42✔
500
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc