• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 13370907002

17 Feb 2025 01:03PM UTC coverage: 47.344% (+0.05%) from 47.291%
13370907002

Pull #840

github

web-flow
Merge c20fba03a into b2c487c1a
Pull Request #840: Support draining DaemonSet pods using sriov devices

20 of 54 new or added lines in 1 file covered. (37.04%)

2 existing lines in 1 file now uncovered.

7282 of 15381 relevant lines covered (47.34%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.33
/pkg/drain/drainer.go
1
package drain
2

3
import (
4
        "context"
5
        "fmt"
6
        "strings"
7
        "time"
8

9
        corev1 "k8s.io/api/core/v1"
10
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11
        "k8s.io/apimachinery/pkg/util/wait"
12
        "k8s.io/client-go/kubernetes"
13
        "k8s.io/kubectl/pkg/drain"
14
        "sigs.k8s.io/controller-runtime/pkg/log"
15

16
        constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
17
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
18
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
19
)
20

21
// writer implements io.Writer interface as a pass-through for log.Log.
22
type writer struct {
23
        logFunc func(msg string, keysAndValues ...interface{})
24
}
25

26
// Write passes string(p) into writer's logFunc and always returns len(p)
27
func (w writer) Write(p []byte) (n int, err error) {
1✔
28
        w.logFunc(string(p))
1✔
29
        return len(p), nil
1✔
30
}
1✔
31

32
type DrainInterface interface {
33
        DrainNode(context.Context, *corev1.Node, bool) (bool, error)
34
        CompleteDrainNode(context.Context, *corev1.Node) (bool, error)
35
}
36

37
type Drainer struct {
38
        kubeClient      kubernetes.Interface
39
        platformHelpers platforms.Interface
40
}
41

42
func NewDrainer(platformHelpers platforms.Interface) (DrainInterface, error) {
1✔
43
        kclient, err := kubernetes.NewForConfig(vars.Config)
1✔
44
        if err != nil {
1✔
45
                return nil, err
×
46
        }
×
47

48
        return &Drainer{
1✔
49
                kubeClient:      kclient,
1✔
50
                platformHelpers: platformHelpers,
1✔
51
        }, err
1✔
52
}
53

54
// DrainNode the function cordon a node and drain pods from it
55
// if fullNodeDrain true all the pods on the system will get drained
56
// for openshift system we also pause the machine config pool this machine is part of it
57
func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrain bool) (bool, error) {
1✔
58
        reqLogger := log.FromContext(ctx).WithValues("drain node", node.Name)
1✔
59
        reqLogger.Info("drainNode(): Node drain requested", "node", node.Name)
1✔
60

1✔
61
        completed, err := d.platformHelpers.OpenshiftBeforeDrainNode(ctx, node)
1✔
62
        if err != nil {
1✔
63
                reqLogger.Error(err, "error running OpenshiftDrainNode")
×
64
                return false, err
×
65
        }
×
66

67
        if !completed {
1✔
68
                reqLogger.Info("OpenshiftDrainNode did not finish re queue the node request")
×
69
                return false, nil
×
70
        }
×
71

72
        drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain)
1✔
73
        backoff := wait.Backoff{
1✔
74
                Steps:    5,
1✔
75
                Duration: 10 * time.Second,
1✔
76
                Factor:   2,
1✔
77
        }
1✔
78
        var lastErr error
1✔
79

1✔
80
        reqLogger.Info("drainNode(): Start draining")
1✔
81
        if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
2✔
82
                err := drain.RunCordonOrUncordon(drainHelper, node, true)
1✔
83
                if err != nil {
1✔
84
                        lastErr = err
×
85
                        reqLogger.Info("drainNode(): Cordon failed, retrying", "error", err)
×
86
                        return false, nil
×
87
                }
×
88
                err = drain.RunNodeDrain(drainHelper, node.Name)
1✔
89
                if err != nil {
1✔
NEW
90
                        lastErr = err
×
NEW
91
                        reqLogger.Info("drainNode(): Draining failed, retrying", "error", err)
×
NEW
92
                        return false, nil
×
NEW
93
                }
×
94

95
                // on full drain there is no need to try and remove pods that are owned by DaemonSets
96
                // as we are going to reboot the node in any case.
97
                if fullNodeDrain {
2✔
98
                        return true, nil
1✔
99
                }
1✔
100

101
                err = d.removeDaemonSetsFromNode(ctx, node.Name)
1✔
102
                if err != nil {
1✔
NEW
103
                        lastErr = err
×
NEW
104
                        return false, nil
×
NEW
105
                }
×
106

107
                return true, nil
1✔
108
        }); err != nil {
×
109
                if wait.Interrupted(err) {
×
110
                        reqLogger.Info("drainNode(): failed to drain node", "steps", backoff.Steps, "error", lastErr)
×
111
                }
×
112
                reqLogger.Info("drainNode(): failed to drain node", "error", err)
×
113
                return false, err
×
114
        }
115
        reqLogger.Info("drainNode(): Drain completed")
1✔
116
        return true, nil
1✔
117
}
118

119
// CompleteDrainNode run un-cordon for the requested node
120
// for openshift system we also remove the pause from the machine config pool this node is part of
121
// only if we are the last draining node on that pool
122
func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (bool, error) {
1✔
123
        logger := log.FromContext(ctx)
1✔
124
        logger.Info("CompleteDrainNode:()")
1✔
125

1✔
126
        // Create drain helper object
1✔
127
        // full drain is not important here
1✔
128
        drainHelper := createDrainHelper(d.kubeClient, ctx, false)
1✔
129

1✔
130
        // run the un cordon function on the node
1✔
131
        if err := drain.RunCordonOrUncordon(drainHelper, node, false); err != nil {
1✔
132
                logger.Error(err, "failed to un-cordon the node")
×
133
                return false, err
×
134
        }
×
135

136
        // call the openshift complete drain to unpause the MCP
137
        // only if we are the last draining node in the pool
138
        completed, err := d.platformHelpers.OpenshiftAfterCompleteDrainNode(ctx, node)
1✔
139
        if err != nil {
1✔
140
                logger.Error(err, "failed to complete openshift draining")
×
141
                return false, err
×
142
        }
×
143

144
        logger.V(2).Info("CompleteDrainNode:()", "drainCompleted", completed)
1✔
145
        return completed, nil
1✔
146
}
147

148
// removeDaemonSetsFromNode go over all the remain pods and search for DaemonSets that have SR-IOV devices to remove them
149
// we can't use the drain from core kubernetes as it doesn't support removing pods that are part of a DaemonSets
150
func (d *Drainer) removeDaemonSetsFromNode(ctx context.Context, nodeName string) error {
1✔
151
        reqLogger := log.FromContext(ctx)
1✔
152
        reqLogger.Info("drainNode(): remove DaemonSets using sriov devices from node", "nodeName", nodeName)
1✔
153

1✔
154
        podList, err := d.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)})
1✔
155
        if err != nil {
1✔
NEW
156
                reqLogger.Info("drainNode(): Failed to list pods, retrying", "error", err)
×
NEW
157
                return err
×
NEW
158
        }
×
159

160
        // remove pods that are owned by a DaemonSet and use SR-IOV devices
161
        dsPodsList := getDsPodsToRemove(podList)
1✔
162
        drainHelper := createDrainHelper(d.kubeClient, ctx, true)
1✔
163
        err = drainHelper.DeleteOrEvictPods(dsPodsList)
1✔
164
        if err != nil {
1✔
NEW
165
                reqLogger.Error(err, "failed to delete or-evict pods from node", "nodeName", nodeName)
×
NEW
166
        }
×
167
        return err
1✔
168
}
169

170
// createDrainHelper function to create a drain helper
171
// if fullDrain is false we only remove pods that have the resourcePrefix
172
// if not we remove all the pods in the node
173
func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, fullDrain bool) *drain.Helper {
1✔
174
        logger := log.FromContext(ctx)
1✔
175
        drainer := &drain.Helper{
1✔
176
                Client:              kubeClient,
1✔
177
                Force:               true,
1✔
178
                IgnoreAllDaemonSets: true,
1✔
179
                DeleteEmptyDirData:  true,
1✔
180
                GracePeriodSeconds:  -1,
1✔
181
                Timeout:             90 * time.Second,
1✔
182
                OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
2✔
183
                        verbStr := constants.DrainDeleted
1✔
184
                        if usingEviction {
2✔
185
                                verbStr = constants.DrainEvicted
1✔
186
                        }
1✔
187
                        log.Log.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
1✔
188
                },
189
                Ctx:    ctx,
190
                Out:    writer{logger.Info},
191
                ErrOut: writer{func(msg string, kv ...interface{}) { logger.Error(nil, msg, kv...) }},
1✔
192
        }
193

194
        // when we just want to drain and not reboot we can only remove the pods using sriov devices
195
        if !fullDrain {
2✔
196
                deleteFunction := func(p corev1.Pod) drain.PodDeleteStatus {
1✔
NEW
197
                        if podHasSRIOVDevice(&p) {
×
NEW
198
                                return drain.PodDeleteStatus{
×
NEW
199
                                        Delete:  true,
×
NEW
200
                                        Reason:  "pod contains SR-IOV device",
×
NEW
201
                                        Message: "SR-IOV network operator draining the node",
×
UNCOV
202
                                }
×
UNCOV
203
                        }
×
204
                        return drain.PodDeleteStatus{Delete: false}
×
205
                }
206

207
                drainer.AdditionalFilters = []drain.PodFilter{deleteFunction}
1✔
208
        }
209

210
        return drainer
1✔
211
}
212

NEW
213
func podHasSRIOVDevice(p *corev1.Pod) bool {
×
NEW
214
        for _, c := range p.Spec.Containers {
×
NEW
215
                if c.Resources.Requests != nil {
×
NEW
216
                        for r := range c.Resources.Requests {
×
NEW
217
                                if strings.HasPrefix(r.String(), vars.ResourcePrefix) {
×
NEW
218
                                        return true
×
NEW
219
                                }
×
220
                        }
221
                }
222
        }
223

NEW
224
        return false
×
225
}
226

NEW
227
func podsHasDSOwner(p *corev1.Pod) bool {
×
NEW
228
        for _, o := range p.OwnerReferences {
×
NEW
229
                if o.Kind == "DaemonSet" {
×
NEW
230
                        return true
×
NEW
231
                }
×
232
        }
233

NEW
234
        return false
×
235
}
236

237
func getDsPodsToRemove(pl *corev1.PodList) []corev1.Pod {
1✔
238
        podsToRemove := []corev1.Pod{}
1✔
239
        for _, pod := range pl.Items {
1✔
NEW
240
                if podsHasDSOwner(&pod) && podHasSRIOVDevice(&pod) {
×
NEW
241
                        podsToRemove = append(podsToRemove, pod)
×
NEW
242
                }
×
243
        }
244

245
        return podsToRemove
1✔
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc