• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 9206688050

23 May 2024 10:50AM UTC coverage: 39.655% (+0.06%) from 39.599%
9206688050

Pull #587

github

web-flow
Merge 5f3c4e903 into 87e4dadb1
Pull Request #587: Add more checks on generic plugin to discover discrepancies from the desired state

54 of 127 new or added lines in 7 files covered. (42.52%)

6 existing lines in 2 files now uncovered.

5175 of 13050 relevant lines covered (39.66%)

0.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.06
/controllers/drain_controller.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "sync"
23
        "time"
24

25
        corev1 "k8s.io/api/core/v1"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
        "k8s.io/apimachinery/pkg/labels"
29
        "k8s.io/apimachinery/pkg/runtime"
30
        "k8s.io/apimachinery/pkg/types"
31
        "k8s.io/apimachinery/pkg/util/intstr"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/util/workqueue"
34
        ctrl "sigs.k8s.io/controller-runtime"
35
        "sigs.k8s.io/controller-runtime/pkg/builder"
36
        "sigs.k8s.io/controller-runtime/pkg/client"
37
        "sigs.k8s.io/controller-runtime/pkg/controller"
38
        "sigs.k8s.io/controller-runtime/pkg/event"
39
        "sigs.k8s.io/controller-runtime/pkg/handler"
40
        "sigs.k8s.io/controller-runtime/pkg/log"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42

43
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
44
        constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
45
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/drain"
46
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
47
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
48
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
49
)
50

51
var (
52
        oneNode     = intstr.FromInt32(1)
53
        defaultNpcl = &sriovnetworkv1.SriovNetworkPoolConfig{Spec: sriovnetworkv1.SriovNetworkPoolConfigSpec{
54
                MaxUnavailable: &oneNode,
55
                NodeSelector:   &metav1.LabelSelector{}}}
56
)
57

58
type DrainReconcile struct {
59
        client.Client
60
        Scheme   *runtime.Scheme
61
        recorder record.EventRecorder
62
        drainer  drain.DrainInterface
63

64
        drainCheckMutex sync.Mutex
65
}
66

67
func NewDrainReconcileController(client client.Client, Scheme *runtime.Scheme, recorder record.EventRecorder, platformHelper platforms.Interface) (*DrainReconcile, error) {
1✔
68
        drainer, err := drain.NewDrainer(platformHelper)
1✔
69
        if err != nil {
1✔
70
                return nil, err
×
71
        }
×
72

73
        return &DrainReconcile{
1✔
74
                client,
1✔
75
                Scheme,
1✔
76
                recorder,
1✔
77
                drainer,
1✔
78
                sync.Mutex{}}, nil
1✔
79
}
80

81
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
82
//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovnodestates,verbs=get;list;watch
83
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
84

85
// Reconcile is part of the main kubernetes reconciliation loop which aims to
86
// move the current state of the cluster closer to the desired state.
87
// For more details, check Reconcile and its Result here:
88
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
89
func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
90
        reqLogger := log.FromContext(ctx)
1✔
91
        reqLogger.Info("Reconciling Drain")
1✔
92

1✔
93
        req.Namespace = vars.Namespace
1✔
94

1✔
95
        // get node object
1✔
96
        node := &corev1.Node{}
1✔
97
        found, err := dr.getObject(ctx, req, node)
1✔
98
        if err != nil {
2✔
99
                reqLogger.Error(err, "failed to get node object")
1✔
100
                return ctrl.Result{}, err
1✔
101
        }
1✔
102
        if !found {
2✔
103
                reqLogger.Info("node not found don't, requeue the request")
1✔
104
                return ctrl.Result{}, nil
1✔
105
        }
1✔
106

107
        // get sriovNodeNodeState object
108
        nodeNetworkState := &sriovnetworkv1.SriovNetworkNodeState{}
1✔
109
        found, err = dr.getObject(ctx, req, nodeNetworkState)
1✔
110
        if err != nil {
1✔
111
                reqLogger.Error(err, "failed to get sriovNetworkNodeState object")
×
112
                return ctrl.Result{}, err
×
113
        }
×
114
        if !found {
2✔
115
                reqLogger.Info("sriovNetworkNodeState not found, don't requeue the request")
1✔
116
                return ctrl.Result{}, nil
1✔
117
        }
1✔
118

119
        // create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object
120
        nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
1✔
121
        if err != nil {
1✔
122
                reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
×
123
                return ctrl.Result{}, err
×
124
        }
×
125

126
        // create the drain state annotation if it doesn't exist in the node object
127
        nodeDrainAnnotation, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
1✔
128
        if err != nil {
1✔
129
                reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
×
130
                return ctrl.Result{}, err
×
131
        }
×
132

133
        reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent)
1✔
134

1✔
135
        // Check the node request
1✔
136
        if nodeDrainAnnotation == constants.DrainIdle {
2✔
137
                // this cover the case the node is on idle
1✔
138

1✔
139
                // node request to be on idle and the currect state is idle
1✔
140
                // we don't do anything
1✔
141
                if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
2✔
142
                        reqLogger.Info("node and nodeState are on idle nothing todo")
1✔
143
                        return reconcile.Result{}, nil
1✔
144
                }
1✔
145

146
                // we have two options here:
147
                // 1. node request idle and the current status is drain complete
148
                // this means the daemon finish is work, so we need to clean the drain
149
                //
150
                // 2. the operator is still draining the node but maybe the sriov policy changed and the daemon
151
                //  doesn't need to drain anymore, so we can stop the drain
152
                if nodeStateDrainAnnotationCurrent == constants.DrainComplete ||
1✔
153
                        nodeStateDrainAnnotationCurrent == constants.Draining {
2✔
154
                        completed, err := dr.drainer.CompleteDrainNode(ctx, node)
1✔
155
                        if err != nil {
1✔
156
                                reqLogger.Error(err, "failed to complete drain on node")
×
157
                                dr.recorder.Event(nodeNetworkState,
×
158
                                        corev1.EventTypeWarning,
×
159
                                        "DrainController",
×
160
                                        "failed to drain node")
×
161
                                return ctrl.Result{}, err
×
162
                        }
×
163

164
                        // if we didn't manage to complete the un drain of the node we retry
165
                        if !completed {
1✔
166
                                reqLogger.Info("complete drain was not completed re queueing the request")
×
167
                                dr.recorder.Event(nodeNetworkState,
×
168
                                        corev1.EventTypeWarning,
×
169
                                        "DrainController",
×
170
                                        "node complete drain was not completed")
×
171
                                // TODO: make this time configurable
×
172
                                return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
×
173
                        }
×
174

175
                        // move the node state back to idle
176
                        err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
1✔
177
                        if err != nil {
1✔
178
                                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
×
179
                                return ctrl.Result{}, err
×
180
                        }
×
181

182
                        reqLogger.Info("completed the un drain for node")
1✔
183
                        dr.recorder.Event(nodeNetworkState,
1✔
184
                                corev1.EventTypeWarning,
1✔
185
                                "DrainController",
1✔
186
                                "node un drain completed")
1✔
187
                        return ctrl.Result{}, nil
1✔
188
                }
189
        } else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
2✔
190
                // this cover the case a node request to drain or reboot
1✔
191

1✔
192
                // nothing to do here we need to wait for the node to move back to idle
1✔
193
                if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
1✔
194
                        reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
×
195
                        return ctrl.Result{}, nil
×
196
                }
×
197

198
                // we need to start the drain, but first we need to check that we can drain the node
199
                if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
2✔
200
                        result, err := dr.tryDrainNode(ctx, node)
1✔
201
                        if err != nil {
1✔
202
                                reqLogger.Error(err, "failed to check if we can drain the node")
×
203
                                return ctrl.Result{}, err
×
204
                        }
×
205

206
                        // in case we need to wait because we just to the max number of draining nodes
207
                        if result != nil {
2✔
208
                                return *result, nil
1✔
209
                        }
1✔
210
                }
211

212
                // class the drain function that will also call drain to other platform providers like openshift
213
                drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
1✔
214
                if err != nil {
1✔
215
                        reqLogger.Error(err, "error trying to drain the node")
×
216
                        dr.recorder.Event(nodeNetworkState,
×
217
                                corev1.EventTypeWarning,
×
218
                                "DrainController",
×
219
                                "failed to drain node")
×
220
                        return reconcile.Result{}, err
×
221
                }
×
222

223
                // if we didn't manage to complete the drain of the node we retry
224
                if !drained {
1✔
225
                        reqLogger.Info("the nodes was not drained re queueing the request")
×
226
                        dr.recorder.Event(nodeNetworkState,
×
227
                                corev1.EventTypeWarning,
×
228
                                "DrainController",
×
229
                                "node drain operation was not completed")
×
230
                        return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
×
231
                }
×
232

233
                // if we manage to drain we label the node state with drain completed and finish
234
                err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
1✔
235
                if err != nil {
2✔
236
                        reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
1✔
237
                        return ctrl.Result{}, err
1✔
238
                }
1✔
239

240
                reqLogger.Info("node drained successfully")
1✔
241
                dr.recorder.Event(nodeNetworkState,
1✔
242
                        corev1.EventTypeWarning,
1✔
243
                        "DrainController",
1✔
244
                        "node drain completed")
1✔
245
                return ctrl.Result{}, nil
1✔
246
        }
247

248
        reqLogger.Error(nil, "unexpected node drain annotation")
×
249
        return reconcile.Result{}, fmt.Errorf("unexpected node drain annotation")
×
250
}
251

252
func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, object client.Object) (bool, error) {
1✔
253
        err := dr.Get(ctx, req.NamespacedName, object)
1✔
254
        if err != nil {
2✔
255
                if errors.IsNotFound(err) {
2✔
256
                        return false, nil
1✔
257
                }
1✔
258
                return false, err
1✔
259
        }
260
        return true, nil
1✔
261
}
262

263
func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, error) {
1✔
264
        value, exist := object.GetAnnotations()[key]
1✔
265
        if !exist {
2✔
266
                err := utils.AnnotateObject(ctx, object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
1✔
267
                if err != nil {
1✔
268
                        return "", err
×
269
                }
×
270
                return constants.DrainIdle, nil
1✔
271
        }
272

273
        return value, nil
1✔
274
}
275

276
func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
1✔
277
        // configure logs
1✔
278
        reqLogger := log.FromContext(ctx)
1✔
279
        reqLogger.Info("checkForNodeDrain():")
1✔
280

1✔
281
        //critical section we need to check if we can start the draining
1✔
282
        dr.drainCheckMutex.Lock()
1✔
283
        defer dr.drainCheckMutex.Unlock()
1✔
284

1✔
285
        // find the relevant node pool
1✔
286
        nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node)
1✔
287
        if err != nil {
1✔
288
                reqLogger.Error(err, "failed to find the pool for the requested node")
×
289
                return nil, err
×
290
        }
×
291

292
        // check how many nodes we can drain in parallel for the specific pool
293
        maxUnv, err := nodePool.MaxUnavailable(len(nodeList))
1✔
294
        if err != nil {
1✔
295
                reqLogger.Error(err, "failed to calculate max unavailable")
×
296
                return nil, err
×
297
        }
×
298

299
        current := 0
1✔
300
        snns := &sriovnetworkv1.SriovNetworkNodeState{}
1✔
301

1✔
302
        var currentSnns *sriovnetworkv1.SriovNetworkNodeState
1✔
303
        for _, nodeObj := range nodeList {
2✔
304
                err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns)
1✔
305
                if err != nil {
1✔
306
                        if errors.IsNotFound(err) {
×
307
                                reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy")
×
308
                                continue
×
309
                        }
310
                        return nil, err
×
311
                }
312

313
                if snns.GetName() == node.GetName() {
2✔
314
                        currentSnns = snns.DeepCopy()
1✔
315
                }
1✔
316

317
                if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) ||
1✔
318
                        utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) {
2✔
319
                        current++
1✔
320
                }
1✔
321
        }
322
        reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv)
1✔
323
        reqLogger.Info("Count of draining", "drainingNodes", current)
1✔
324

1✔
325
        // if maxUnv is zero this means we drain all the nodes in parallel without a limit
1✔
326
        if maxUnv == -1 {
2✔
327
                reqLogger.Info("draining all the nodes in parallel")
1✔
328
        } else if current >= maxUnv {
3✔
329
                // the node requested to be drained, but we are at the limit so we re-enqueue the request
1✔
330
                reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request")
1✔
331
                // TODO: make this time configurable
1✔
332
                return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil
1✔
333
        }
1✔
334

335
        if currentSnns == nil {
1✔
336
                return nil, fmt.Errorf("failed to find sriov network node state for requested node")
×
337
        }
×
338

339
        err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
1✔
340
        if err != nil {
1✔
341
                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
×
342
                return nil, err
×
343
        }
×
344

345
        return nil, nil
1✔
346
}
347

348
func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) {
1✔
349
        logger := log.FromContext(ctx)
1✔
350
        logger.Info("findNodePoolConfig():")
1✔
351
        // get all the sriov network pool configs
1✔
352
        npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{}
1✔
353
        err := dr.List(ctx, npcl)
1✔
354
        if err != nil {
1✔
355
                logger.Error(err, "failed to list sriovNetworkPoolConfig")
×
356
                return nil, nil, err
×
357
        }
×
358

359
        selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{}
1✔
360
        nodesInPools := map[string]interface{}{}
1✔
361

1✔
362
        for _, npc := range npcl.Items {
2✔
363
                // we skip hw offload objects
1✔
364
                if npc.Spec.OvsHardwareOffloadConfig.Name != "" {
1✔
UNCOV
365
                        continue
×
366
                }
367

368
                if npc.Spec.NodeSelector == nil {
2✔
369
                        npc.Spec.NodeSelector = &metav1.LabelSelector{}
1✔
370
                }
1✔
371

372
                selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector)
1✔
373
                if err != nil {
1✔
374
                        logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector)
×
375
                        return nil, nil, err
×
376
                }
×
377

378
                if selector.Matches(labels.Set(node.Labels)) {
2✔
379
                        selectedNpcl = append(selectedNpcl, npc.DeepCopy())
1✔
380
                }
1✔
381

382
                nodeList := &corev1.NodeList{}
1✔
383
                err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
1✔
384
                if err != nil {
1✔
385
                        logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector",
×
386
                                "machineConfigPoolName", npc,
×
387
                                "nodeSelector", npc.Spec.NodeSelector)
×
388
                        return nil, nil, err
×
389
                }
×
390

391
                for _, nodeName := range nodeList.Items {
2✔
392
                        nodesInPools[nodeName.Name] = nil
1✔
393
                }
1✔
394
        }
395

396
        if len(selectedNpcl) > 1 {
1✔
397
                // don't allow the node to be part of multiple pools
×
398
                err = fmt.Errorf("node is part of more then one pool")
×
399
                logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl)
×
400
                return nil, nil, err
×
401
        } else if len(selectedNpcl) == 1 {
2✔
402
                // found one pool for our node
1✔
403
                logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0])
1✔
404
                selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector)
1✔
405
                if err != nil {
1✔
406
                        logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector)
×
407
                        return nil, nil, err
×
408
                }
×
409

410
                // list all the nodes that are also part of this pool and return them
411
                nodeList := &corev1.NodeList{}
1✔
412
                err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
1✔
413
                if err != nil {
1✔
414
                        logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector)
×
415
                        return nil, nil, err
×
416
                }
×
417

418
                return selectedNpcl[0], nodeList.Items, nil
1✔
419
        } else {
1✔
420
                // in this case we get all the nodes and remove the ones that already part of any pool
1✔
421
                logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultNpcl)
1✔
422
                nodeList := &corev1.NodeList{}
1✔
423
                err = dr.List(ctx, nodeList)
1✔
424
                if err != nil {
1✔
425
                        logger.Error(err, "failed to list all the nodes")
×
426
                        return nil, nil, err
×
427
                }
×
428

429
                defaultNodeLists := []corev1.Node{}
1✔
430
                for _, nodeObj := range nodeList.Items {
2✔
431
                        if _, exist := nodesInPools[nodeObj.Name]; !exist {
2✔
432
                                defaultNodeLists = append(defaultNodeLists, nodeObj)
1✔
433
                        }
1✔
434
                }
435
                return defaultNpcl, defaultNodeLists, nil
1✔
436
        }
437
}
438

439
// SetupWithManager sets up the controller with the Manager.
440
func (dr *DrainReconcile) SetupWithManager(mgr ctrl.Manager) error {
1✔
441
        createUpdateEnqueue := handler.Funcs{
1✔
442
                CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
2✔
443
                        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
1✔
444
                                Namespace: vars.Namespace,
1✔
445
                                Name:      e.Object.GetName(),
1✔
446
                        }})
1✔
447
                },
1✔
448
                UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
×
449
                        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
×
450
                                Namespace: vars.Namespace,
×
451
                                Name:      e.ObjectNew.GetName(),
×
452
                        }})
×
453
                },
×
454
        }
455

456
        // Watch for spec and annotation changes
457
        nodePredicates := builder.WithPredicates(DrainAnnotationPredicate{})
1✔
458
        nodeStatePredicates := builder.WithPredicates(DrainStateAnnotationPredicate{})
1✔
459

1✔
460
        return ctrl.NewControllerManagedBy(mgr).
1✔
461
                WithOptions(controller.Options{MaxConcurrentReconciles: 50}).
1✔
462
                For(&corev1.Node{}, nodePredicates).
1✔
463
                Watches(&sriovnetworkv1.SriovNetworkNodeState{}, createUpdateEnqueue, nodeStatePredicates).
1✔
464
                Complete(dr)
1✔
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc