• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 8467233545

28 Mar 2024 12:26PM UTC coverage: 38.344% (+0.7%) from 37.675%
8467233545

push

github

web-flow
Merge pull request #643 from ykulazhenkov/pr-turn-on-switchdev

[switchdev 9/9] Enable new switchdev implementation

189 of 289 new or added lines in 9 files covered. (65.4%)

26 existing lines in 8 files now uncovered.

4798 of 12513 relevant lines covered (38.34%)

0.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.68
/controllers/drain_controller.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "sync"
23
        "time"
24

25
        corev1 "k8s.io/api/core/v1"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
        "k8s.io/apimachinery/pkg/labels"
29
        "k8s.io/apimachinery/pkg/runtime"
30
        "k8s.io/apimachinery/pkg/types"
31
        "k8s.io/apimachinery/pkg/util/intstr"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/util/workqueue"
34
        ctrl "sigs.k8s.io/controller-runtime"
35
        "sigs.k8s.io/controller-runtime/pkg/builder"
36
        "sigs.k8s.io/controller-runtime/pkg/client"
37
        "sigs.k8s.io/controller-runtime/pkg/controller"
38
        "sigs.k8s.io/controller-runtime/pkg/event"
39
        "sigs.k8s.io/controller-runtime/pkg/handler"
40
        "sigs.k8s.io/controller-runtime/pkg/log"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42

43
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
44
        constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
45
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/drain"
46
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
47
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
48
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
49
)
50

51
var (
52
        oneNode     = intstr.FromInt32(1)
53
        defaultNpcl = &sriovnetworkv1.SriovNetworkPoolConfig{Spec: sriovnetworkv1.SriovNetworkPoolConfigSpec{
54
                MaxUnavailable: &oneNode,
55
                NodeSelector:   &metav1.LabelSelector{}}}
56
)
57

58
type DrainReconcile struct {
59
        client.Client
60
        Scheme   *runtime.Scheme
61
        recorder record.EventRecorder
62
        drainer  drain.DrainInterface
63

64
        drainCheckMutex sync.Mutex
65
}
66

67
func NewDrainReconcileController(client client.Client, Scheme *runtime.Scheme, recorder record.EventRecorder, platformHelper platforms.Interface) (*DrainReconcile, error) {
1✔
68
        drainer, err := drain.NewDrainer(platformHelper)
1✔
69
        if err != nil {
1✔
70
                return nil, err
×
71
        }
×
72

73
        return &DrainReconcile{
1✔
74
                client,
1✔
75
                Scheme,
1✔
76
                recorder,
1✔
77
                drainer,
1✔
78
                sync.Mutex{}}, nil
1✔
79
}
80

81
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
82
//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovnodestates,verbs=get;list;watch
83
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
84

85
// Reconcile is part of the main kubernetes reconciliation loop which aims to
86
// move the current state of the cluster closer to the desired state.
87
// For more details, check Reconcile and its Result here:
88
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
89
func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
90
        reqLogger := log.FromContext(ctx)
1✔
91
        reqLogger.Info("Reconciling Drain")
1✔
92

1✔
93
        req.Namespace = vars.Namespace
1✔
94

1✔
95
        // get node object
1✔
96
        node := &corev1.Node{}
1✔
97
        err := dr.getObject(ctx, req, node)
1✔
98
        if err != nil {
2✔
99
                reqLogger.Error(err, "failed to get node object")
1✔
100
                return ctrl.Result{}, err
1✔
101
        }
1✔
102

103
        // get sriovNodeNodeState object
104
        nodeNetworkState := &sriovnetworkv1.SriovNetworkNodeState{}
1✔
105
        err = dr.getObject(ctx, req, nodeNetworkState)
1✔
106
        if err != nil {
1✔
107
                reqLogger.Error(err, "failed to get sriovNetworkNodeState object")
×
108
                return ctrl.Result{}, err
×
109
        }
×
110

111
        // create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object
112
        nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
1✔
113
        if err != nil {
2✔
114
                reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
1✔
115
                return ctrl.Result{}, err
1✔
116
        }
1✔
117

118
        // create the drain state annotation if it doesn't exist in the node object
119
        nodeDrainAnnotation, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
1✔
120
        if err != nil {
2✔
121
                reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
1✔
122
                return ctrl.Result{}, err
1✔
123
        }
1✔
124

125
        reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent)
1✔
126

1✔
127
        // Check the node request
1✔
128
        if nodeDrainAnnotation == constants.DrainIdle {
2✔
129
                // this cover the case the node is on idle
1✔
130

1✔
131
                // node request to be on idle and the currect state is idle
1✔
132
                // we don't do anything
1✔
133
                if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
2✔
134
                        reqLogger.Info("node and nodeState are on idle nothing todo")
1✔
135
                        return reconcile.Result{}, nil
1✔
136
                }
1✔
137

138
                // we have two options here:
139
                // 1. node request idle and the current status is drain complete
140
                // this means the daemon finish is work, so we need to clean the drain
141
                //
142
                // 2. the operator is still draining the node but maybe the sriov policy changed and the daemon
143
                //  doesn't need to drain anymore, so we can stop the drain
144
                if nodeStateDrainAnnotationCurrent == constants.DrainComplete ||
1✔
145
                        nodeStateDrainAnnotationCurrent == constants.Draining {
2✔
146
                        completed, err := dr.drainer.CompleteDrainNode(ctx, node)
1✔
147
                        if err != nil {
1✔
148
                                reqLogger.Error(err, "failed to complete drain on node")
×
149
                                dr.recorder.Event(nodeNetworkState,
×
150
                                        corev1.EventTypeWarning,
×
151
                                        "DrainController",
×
152
                                        "failed to drain node")
×
153
                                return ctrl.Result{}, err
×
154
                        }
×
155

156
                        // if we didn't manage to complete the un drain of the node we retry
157
                        if !completed {
1✔
158
                                reqLogger.Info("complete drain was not completed re queueing the request")
×
159
                                dr.recorder.Event(nodeNetworkState,
×
160
                                        corev1.EventTypeWarning,
×
161
                                        "DrainController",
×
162
                                        "node complete drain was not completed")
×
163
                                // TODO: make this time configurable
×
164
                                return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
×
165
                        }
×
166

167
                        // move the node state back to idle
168
                        err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
1✔
169
                        if err != nil {
1✔
170
                                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
×
171
                                return ctrl.Result{}, err
×
172
                        }
×
173

174
                        reqLogger.Info("completed the un drain for node")
1✔
175
                        dr.recorder.Event(nodeNetworkState,
1✔
176
                                corev1.EventTypeWarning,
1✔
177
                                "DrainController",
1✔
178
                                "node un drain completed")
1✔
179
                        return ctrl.Result{}, nil
1✔
180
                }
181
        } else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
2✔
182
                // this cover the case a node request to drain or reboot
1✔
183

1✔
184
                // nothing to do here we need to wait for the node to move back to idle
1✔
185
                if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
1✔
UNCOV
186
                        reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
×
UNCOV
187
                        return ctrl.Result{}, nil
×
UNCOV
188
                }
×
189

190
                // we need to start the drain, but first we need to check that we can drain the node
191
                if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
2✔
192
                        result, err := dr.tryDrainNode(ctx, node)
1✔
193
                        if err != nil {
1✔
194
                                reqLogger.Error(err, "failed to check if we can drain the node")
×
195
                                return ctrl.Result{}, err
×
196
                        }
×
197

198
                        // in case we need to wait because we just to the max number of draining nodes
199
                        if result != nil {
2✔
200
                                return *result, nil
1✔
201
                        }
1✔
202
                }
203

204
                // class the drain function that will also call drain to other platform providers like openshift
205
                drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
1✔
206
                if err != nil {
1✔
207
                        reqLogger.Error(err, "error trying to drain the node")
×
208
                        dr.recorder.Event(nodeNetworkState,
×
209
                                corev1.EventTypeWarning,
×
210
                                "DrainController",
×
211
                                "failed to drain node")
×
212
                        return reconcile.Result{}, err
×
213
                }
×
214

215
                // if we didn't manage to complete the drain of the node we retry
216
                if !drained {
1✔
217
                        reqLogger.Info("the nodes was not drained re queueing the request")
×
218
                        dr.recorder.Event(nodeNetworkState,
×
219
                                corev1.EventTypeWarning,
×
220
                                "DrainController",
×
221
                                "node drain operation was not completed")
×
222
                        return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
×
223
                }
×
224

225
                // if we manage to drain we label the node state with drain completed and finish
226
                err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
1✔
227
                if err != nil {
2✔
228
                        reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
1✔
229
                        return ctrl.Result{}, err
1✔
230
                }
1✔
231

232
                reqLogger.Info("node drained successfully")
1✔
233
                dr.recorder.Event(nodeNetworkState,
1✔
234
                        corev1.EventTypeWarning,
1✔
235
                        "DrainController",
1✔
236
                        "node drain completed")
1✔
237
                return ctrl.Result{}, nil
1✔
238
        }
239

240
        reqLogger.Error(nil, "unexpected node drain annotation")
×
241
        return reconcile.Result{}, fmt.Errorf("unexpected node drain annotation")
×
242
}
243

244
func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, object client.Object) error {
1✔
245
        reqLogger := log.FromContext(ctx)
1✔
246
        reqLogger.Info("getObject():")
1✔
247

1✔
248
        err := dr.Get(ctx, req.NamespacedName, object)
1✔
249
        if err != nil {
2✔
250
                if errors.IsNotFound(err) {
2✔
251
                        reqLogger.Error(err, "object doesn't exist", "objectName", req.Name)
1✔
252
                        return nil
1✔
253
                }
1✔
254

255
                reqLogger.Error(err, "failed to get object from api re-queue the request", "objectName", req.Name)
1✔
256
                return err
1✔
257
        }
258

259
        return nil
1✔
260
}
261

262
func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, error) {
1✔
263
        value, exist := object.GetAnnotations()[key]
1✔
264
        if !exist {
2✔
265
                err := utils.AnnotateObject(ctx, object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
1✔
266
                if err != nil {
2✔
267
                        return "", err
1✔
268
                }
1✔
269
                return constants.DrainIdle, nil
1✔
270
        }
271

272
        return value, nil
1✔
273
}
274

275
func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
1✔
276
        // configure logs
1✔
277
        reqLogger := log.FromContext(ctx)
1✔
278
        reqLogger.Info("checkForNodeDrain():")
1✔
279

1✔
280
        //critical section we need to check if we can start the draining
1✔
281
        dr.drainCheckMutex.Lock()
1✔
282
        defer dr.drainCheckMutex.Unlock()
1✔
283

1✔
284
        // find the relevant node pool
1✔
285
        nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node)
1✔
286
        if err != nil {
1✔
287
                reqLogger.Error(err, "failed to find the pool for the requested node")
×
288
                return nil, err
×
289
        }
×
290

291
        // check how many nodes we can drain in parallel for the specific pool
292
        maxUnv, err := nodePool.MaxUnavailable(len(nodeList))
1✔
293
        if err != nil {
1✔
294
                reqLogger.Error(err, "failed to calculate max unavailable")
×
295
                return nil, err
×
296
        }
×
297

298
        current := 0
1✔
299
        snns := &sriovnetworkv1.SriovNetworkNodeState{}
1✔
300

1✔
301
        var currentSnns *sriovnetworkv1.SriovNetworkNodeState
1✔
302
        for _, nodeObj := range nodeList {
2✔
303
                err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns)
1✔
304
                if err != nil {
1✔
305
                        if errors.IsNotFound(err) {
×
306
                                reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy")
×
307
                                continue
×
308
                        }
309
                        return nil, err
×
310
                }
311

312
                if snns.GetName() == node.GetName() {
2✔
313
                        currentSnns = snns.DeepCopy()
1✔
314
                }
1✔
315

316
                if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) ||
1✔
317
                        utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) {
2✔
318
                        current++
1✔
319
                }
1✔
320
        }
321
        reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv)
1✔
322
        reqLogger.Info("Count of draining", "drainingNodes", current)
1✔
323

1✔
324
        // if maxUnv is zero this means we drain all the nodes in parallel without a limit
1✔
325
        if maxUnv == -1 {
2✔
326
                reqLogger.Info("draining all the nodes in parallel")
1✔
327
        } else if current >= maxUnv {
3✔
328
                // the node requested to be drained, but we are at the limit so we re-enqueue the request
1✔
329
                reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request")
1✔
330
                // TODO: make this time configurable
1✔
331
                return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil
1✔
332
        }
1✔
333

334
        if currentSnns == nil {
1✔
335
                return nil, fmt.Errorf("failed to find sriov network node state for requested node")
×
336
        }
×
337

338
        err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
1✔
339
        if err != nil {
1✔
340
                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
×
341
                return nil, err
×
342
        }
×
343

344
        return nil, nil
1✔
345
}
346

347
func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) {
1✔
348
        logger := log.FromContext(ctx)
1✔
349
        logger.Info("findNodePoolConfig():")
1✔
350
        // get all the sriov network pool configs
1✔
351
        npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{}
1✔
352
        err := dr.List(ctx, npcl)
1✔
353
        if err != nil {
1✔
354
                logger.Error(err, "failed to list sriovNetworkPoolConfig")
×
355
                return nil, nil, err
×
356
        }
×
357

358
        selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{}
1✔
359
        nodesInPools := map[string]interface{}{}
1✔
360

1✔
361
        for _, npc := range npcl.Items {
2✔
362
                // we skip hw offload objects
1✔
363
                if npc.Spec.OvsHardwareOffloadConfig.Name != "" {
2✔
364
                        continue
1✔
365
                }
366

367
                if npc.Spec.NodeSelector == nil {
2✔
368
                        npc.Spec.NodeSelector = &metav1.LabelSelector{}
1✔
369
                }
1✔
370

371
                selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector)
1✔
372
                if err != nil {
1✔
373
                        logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector)
×
374
                        return nil, nil, err
×
375
                }
×
376

377
                if selector.Matches(labels.Set(node.Labels)) {
2✔
378
                        selectedNpcl = append(selectedNpcl, npc.DeepCopy())
1✔
379
                }
1✔
380

381
                nodeList := &corev1.NodeList{}
1✔
382
                err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
1✔
383
                if err != nil {
1✔
384
                        logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector",
×
385
                                "machineConfigPoolName", npc,
×
386
                                "nodeSelector", npc.Spec.NodeSelector)
×
387
                        return nil, nil, err
×
388
                }
×
389

390
                for _, nodeName := range nodeList.Items {
2✔
391
                        nodesInPools[nodeName.Name] = nil
1✔
392
                }
1✔
393
        }
394

395
        if len(selectedNpcl) > 1 {
1✔
396
                // don't allow the node to be part of multiple pools
×
397
                err = fmt.Errorf("node is part of more then one pool")
×
398
                logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl)
×
399
                return nil, nil, err
×
400
        } else if len(selectedNpcl) == 1 {
2✔
401
                // found one pool for our node
1✔
402
                logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0])
1✔
403
                selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector)
1✔
404
                if err != nil {
1✔
405
                        logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector)
×
406
                        return nil, nil, err
×
407
                }
×
408

409
                // list all the nodes that are also part of this pool and return them
410
                nodeList := &corev1.NodeList{}
1✔
411
                err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
1✔
412
                if err != nil {
1✔
413
                        logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector)
×
414
                        return nil, nil, err
×
415
                }
×
416

417
                return selectedNpcl[0], nodeList.Items, nil
1✔
418
        } else {
1✔
419
                // in this case we get all the nodes and remove the ones that already part of any pool
1✔
420
                logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultNpcl)
1✔
421
                nodeList := &corev1.NodeList{}
1✔
422
                err = dr.List(ctx, nodeList)
1✔
423
                if err != nil {
1✔
424
                        logger.Error(err, "failed to list all the nodes")
×
425
                        return nil, nil, err
×
426
                }
×
427

428
                defaultNodeLists := []corev1.Node{}
1✔
429
                for _, nodeObj := range nodeList.Items {
2✔
430
                        if _, exist := nodesInPools[nodeObj.Name]; !exist {
2✔
431
                                defaultNodeLists = append(defaultNodeLists, nodeObj)
1✔
432
                        }
1✔
433
                }
434
                return defaultNpcl, defaultNodeLists, nil
1✔
435
        }
436
}
437

438
// SetupWithManager sets up the controller with the Manager.
439
func (dr *DrainReconcile) SetupWithManager(mgr ctrl.Manager) error {
1✔
440
        createUpdateEnqueue := handler.Funcs{
1✔
441
                CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
2✔
442
                        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
1✔
443
                                Namespace: vars.Namespace,
1✔
444
                                Name:      e.Object.GetName(),
1✔
445
                        }})
1✔
446
                },
1✔
447
                UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
×
448
                        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
×
449
                                Namespace: vars.Namespace,
×
450
                                Name:      e.ObjectNew.GetName(),
×
451
                        }})
×
452
                },
×
453
        }
454

455
        // Watch for spec and annotation changes
456
        nodePredicates := builder.WithPredicates(DrainAnnotationPredicate{})
1✔
457
        nodeStatePredicates := builder.WithPredicates(DrainStateAnnotationPredicate{})
1✔
458

1✔
459
        return ctrl.NewControllerManagedBy(mgr).
1✔
460
                WithOptions(controller.Options{MaxConcurrentReconciles: 50}).
1✔
461
                For(&corev1.Node{}, nodePredicates).
1✔
462
                Watches(&sriovnetworkv1.SriovNetworkNodeState{}, createUpdateEnqueue, nodeStatePredicates).
1✔
463
                Complete(dr)
1✔
464
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc