• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 13526205039

25 Feb 2025 04:34PM UTC coverage: 48.882% (+0.9%) from 48.008%
13526205039

Pull #788

github

web-flow
Merge 3e6b91cf3 into d7c9458e0
Pull Request #788: Daemon redesign - using controller-runtime

273 of 615 new or added lines in 18 files covered. (44.39%)

199 existing lines in 12 files now uncovered.

7324 of 14983 relevant lines covered (48.88%)

0.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.59
/controllers/drain_controller_helper.go
1
package controllers
2

3
import (
4
        "context"
5
        "fmt"
6

7
        "github.com/go-logr/logr"
8
        corev1 "k8s.io/api/core/v1"
9
        "k8s.io/apimachinery/pkg/api/errors"
10
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11
        "k8s.io/apimachinery/pkg/labels"
12
        ctrl "sigs.k8s.io/controller-runtime"
13
        "sigs.k8s.io/controller-runtime/pkg/client"
14
        "sigs.k8s.io/controller-runtime/pkg/log"
15
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
16

17
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
18
        constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
19
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
20
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
21
)
22

23
func (dr *DrainReconcile) handleNodeIdleNodeStateDrainingOrCompleted(ctx context.Context,
24
        reqLogger *logr.Logger,
25
        node *corev1.Node,
26
        nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState) (ctrl.Result, error) {
1✔
27
        completed, err := dr.drainer.CompleteDrainNode(ctx, node)
1✔
28
        if err != nil {
1✔
29
                reqLogger.Error(err, "failed to complete drain on node")
×
30
                dr.recorder.Event(nodeNetworkState,
×
31
                        corev1.EventTypeWarning,
×
32
                        "DrainController",
×
33
                        "failed to drain node")
×
34
                return ctrl.Result{}, err
×
35
        }
×
36

37
        // if we didn't manage to complete the un drain of the node we retry
38
        if !completed {
1✔
39
                reqLogger.Info("complete drain was not completed re queueing the request")
×
40
                dr.recorder.Event(nodeNetworkState,
×
41
                        corev1.EventTypeWarning,
×
42
                        "DrainController",
×
43
                        "node complete drain was not completed")
×
44
                // TODO: make this time configurable
×
NEW
45
                return reconcile.Result{RequeueAfter: constants.DrainControllerRequeueTime}, nil
×
46
        }
×
47

48
        // move the node state back to idle
49
        err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
1✔
50
        if err != nil {
1✔
51
                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
×
52
                return ctrl.Result{}, err
×
53
        }
×
54

55
        reqLogger.Info("completed the un drain for node")
1✔
56
        dr.recorder.Event(nodeNetworkState,
1✔
57
                corev1.EventTypeWarning,
1✔
58
                "DrainController",
1✔
59
                "node un drain completed")
1✔
60
        return ctrl.Result{}, nil
1✔
61
}
62

63
func (dr *DrainReconcile) handleNodeDrainOrReboot(ctx context.Context,
64
        reqLogger *logr.Logger,
65
        node *corev1.Node,
66
        nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState,
67
        nodeDrainAnnotation,
68
        nodeStateDrainAnnotationCurrent string) (ctrl.Result, error) {
1✔
69
        // nothing to do here we need to wait for the node to move back to idle
1✔
70
        if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
2✔
71
                reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
1✔
72
                return ctrl.Result{}, nil
1✔
73
        }
1✔
74

75
        // we need to start the drain, but first we need to check that we can drain the node
76
        if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
2✔
77
                result, err := dr.tryDrainNode(ctx, node)
1✔
78
                if err != nil {
1✔
79
                        reqLogger.Error(err, "failed to check if we can drain the node")
×
80
                        return ctrl.Result{}, err
×
81
                }
×
82

83
                // in case we need to wait because we just to the max number of draining nodes
84
                if result != nil {
2✔
85
                        return *result, nil
1✔
86
                }
1✔
87
        }
88

89
        // call the drain function that will also call drain to other platform providers like openshift
90
        drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
1✔
91
        if err != nil {
1✔
92
                reqLogger.Error(err, "error trying to drain the node")
×
93
                dr.recorder.Event(nodeNetworkState,
×
94
                        corev1.EventTypeWarning,
×
95
                        "DrainController",
×
96
                        "failed to drain node")
×
97
                return reconcile.Result{}, err
×
98
        }
×
99

100
        // if we didn't manage to complete the drain of the node we retry
101
        if !drained {
1✔
102
                reqLogger.Info("the nodes was not drained re queueing the request")
×
103
                dr.recorder.Event(nodeNetworkState,
×
104
                        corev1.EventTypeWarning,
×
105
                        "DrainController",
×
106
                        "node drain operation was not completed")
×
NEW
107
                return reconcile.Result{RequeueAfter: constants.DrainControllerRequeueTime}, nil
×
108
        }
×
109

110
        // if we manage to drain we label the node state with drain completed and finish
111
        err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
1✔
112
        if err != nil {
2✔
113
                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
1✔
114
                return ctrl.Result{}, err
1✔
115
        }
1✔
116

117
        reqLogger.Info("node drained successfully")
1✔
118
        dr.recorder.Event(nodeNetworkState,
1✔
119
                corev1.EventTypeWarning,
1✔
120
                "DrainController",
1✔
121
                "node drain completed")
1✔
122
        return ctrl.Result{}, nil
1✔
123
}
124

125
func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
1✔
126
        // configure logs
1✔
127
        reqLogger := log.FromContext(ctx)
1✔
128
        reqLogger.Info("checkForNodeDrain():")
1✔
129

1✔
130
        //critical section we need to check if we can start the draining
1✔
131
        dr.drainCheckMutex.Lock()
1✔
132
        defer dr.drainCheckMutex.Unlock()
1✔
133

1✔
134
        // find the relevant node pool
1✔
135
        nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node)
1✔
136
        if err != nil {
1✔
137
                reqLogger.Error(err, "failed to find the pool for the requested node")
×
138
                return nil, err
×
139
        }
×
140

141
        // check how many nodes we can drain in parallel for the specific pool
142
        maxUnv, err := nodePool.MaxUnavailable(len(nodeList))
1✔
143
        if err != nil {
1✔
144
                reqLogger.Error(err, "failed to calculate max unavailable")
×
145
                return nil, err
×
146
        }
×
147

148
        current := 0
1✔
149
        snns := &sriovnetworkv1.SriovNetworkNodeState{}
1✔
150

1✔
151
        var currentSnns *sriovnetworkv1.SriovNetworkNodeState
1✔
152
        for _, nodeObj := range nodeList {
2✔
153
                err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns)
1✔
154
                if err != nil {
1✔
155
                        if errors.IsNotFound(err) {
×
156
                                reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy")
×
157
                                continue
×
158
                        }
159
                        return nil, err
×
160
                }
161

162
                if snns.GetName() == node.GetName() {
2✔
163
                        currentSnns = snns.DeepCopy()
1✔
164
                }
1✔
165

166
                if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) ||
1✔
167
                        utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) {
2✔
168
                        current++
1✔
169
                }
1✔
170
        }
171
        reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv)
1✔
172
        reqLogger.Info("Count of draining", "drainingNodes", current)
1✔
173

1✔
174
        // if maxUnv is zero this means we drain all the nodes in parallel without a limit
1✔
175
        if maxUnv == -1 {
2✔
176
                reqLogger.Info("draining all the nodes in parallel")
1✔
177
        } else if current >= maxUnv {
3✔
178
                // the node requested to be drained, but we are at the limit so we re-enqueue the request
1✔
179
                reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request")
1✔
180
                // TODO: make this time configurable
1✔
181
                return &reconcile.Result{RequeueAfter: constants.DrainControllerRequeueTime}, nil
1✔
182
        }
1✔
183

184
        if currentSnns == nil {
1✔
185
                return nil, fmt.Errorf("failed to find sriov network node state for requested node")
×
186
        }
×
187

188
        err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
1✔
189
        if err != nil {
1✔
190
                reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
×
191
                return nil, err
×
192
        }
×
193

194
        return nil, nil
1✔
195
}
196

197
func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) {
1✔
198
        logger := log.FromContext(ctx)
1✔
199
        logger.Info("findNodePoolConfig():")
1✔
200
        // get all the sriov network pool configs
1✔
201
        npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{}
1✔
202
        err := dr.List(ctx, npcl)
1✔
203
        if err != nil {
1✔
204
                logger.Error(err, "failed to list sriovNetworkPoolConfig")
×
205
                return nil, nil, err
×
206
        }
×
207

208
        selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{}
1✔
209
        nodesInPools := map[string]interface{}{}
1✔
210

1✔
211
        for _, npc := range npcl.Items {
2✔
212
                // we skip hw offload objects
1✔
213
                if npc.Spec.OvsHardwareOffloadConfig.Name != "" {
1✔
214
                        continue
×
215
                }
216

217
                if npc.Spec.NodeSelector == nil {
2✔
218
                        npc.Spec.NodeSelector = &metav1.LabelSelector{}
1✔
219
                }
1✔
220

221
                selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector)
1✔
222
                if err != nil {
1✔
223
                        logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector)
×
224
                        return nil, nil, err
×
225
                }
×
226

227
                if selector.Matches(labels.Set(node.Labels)) {
2✔
228
                        selectedNpcl = append(selectedNpcl, npc.DeepCopy())
1✔
229
                }
1✔
230

231
                nodeList := &corev1.NodeList{}
1✔
232
                err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
1✔
233
                if err != nil {
1✔
234
                        logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector",
×
235
                                "machineConfigPoolName", npc,
×
236
                                "nodeSelector", npc.Spec.NodeSelector)
×
237
                        return nil, nil, err
×
238
                }
×
239

240
                for _, nodeName := range nodeList.Items {
2✔
241
                        nodesInPools[nodeName.Name] = nil
1✔
242
                }
1✔
243
        }
244

245
        if len(selectedNpcl) > 1 {
1✔
246
                // don't allow the node to be part of multiple pools
×
247
                err = fmt.Errorf("node is part of more then one pool")
×
248
                logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl)
×
249
                return nil, nil, err
×
250
        } else if len(selectedNpcl) == 1 {
2✔
251
                // found one pool for our node
1✔
252
                logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0])
1✔
253
                selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector)
1✔
254
                if err != nil {
1✔
255
                        logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector)
×
256
                        return nil, nil, err
×
257
                }
×
258

259
                // list all the nodes that are also part of this pool and return them
260
                nodeList := &corev1.NodeList{}
1✔
261
                err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
1✔
262
                if err != nil {
1✔
263
                        logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector)
×
264
                        return nil, nil, err
×
265
                }
×
266

267
                return selectedNpcl[0], nodeList.Items, nil
1✔
268
        } else {
1✔
269
                // in this case we get all the nodes and remove the ones that already part of any pool
1✔
270
                logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultPoolConfig)
1✔
271
                nodeList := &corev1.NodeList{}
1✔
272
                err = dr.List(ctx, nodeList)
1✔
273
                if err != nil {
1✔
274
                        logger.Error(err, "failed to list all the nodes")
×
275
                        return nil, nil, err
×
276
                }
×
277

278
                defaultNodeLists := []corev1.Node{}
1✔
279
                for _, nodeObj := range nodeList.Items {
2✔
280
                        if _, exist := nodesInPools[nodeObj.Name]; !exist {
2✔
281
                                defaultNodeLists = append(defaultNodeLists, nodeObj)
1✔
282
                        }
1✔
283
                }
284
                return defaultPoolConfig, defaultNodeLists, nil
1✔
285
        }
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc