• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been set to done!

k8snetworkplumbingwg / sriov-network-operator / 13856261115

14 Mar 2025 12:09PM UTC coverage: 48.251% (-0.6%) from 48.875%
13856261115

Pull #856

github

web-flow
Merge 2cb0a44e6 into 230f50baa
Pull Request #856: Reach MetaData service over IPv6

0 of 52 new or added lines in 1 file covered. (0.0%)

673 existing lines in 11 files now uncovered.

7350 of 15233 relevant lines covered (48.25%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.77
/pkg/drain/drainer.go
1
package drain
2

3
import (
4
        "context"
5
        "fmt"
6
        "strings"
7
        "time"
8

9
        corev1 "k8s.io/api/core/v1"
10
        "k8s.io/apimachinery/pkg/util/wait"
11
        "k8s.io/client-go/kubernetes"
12
        "k8s.io/kubectl/pkg/drain"
13
        "sigs.k8s.io/controller-runtime/pkg/log"
14

15
        constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
16
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
17
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
18
)
19

20
// writer implements io.Writer interface as a pass-through for log.Log.
21
type writer struct {
22
        logFunc func(msg string, keysAndValues ...interface{})
23
}
24

25
// Write passes string(p) into writer's logFunc and always returns len(p)
26
func (w writer) Write(p []byte) (n int, err error) {
1✔
27
        w.logFunc(string(p))
1✔
28
        return len(p), nil
1✔
29
}
1✔
30

31
type DrainInterface interface {
32
        DrainNode(context.Context, *corev1.Node, bool, bool) (bool, error)
33
        CompleteDrainNode(context.Context, *corev1.Node) (bool, error)
34
}
35

36
type Drainer struct {
37
        kubeClient      kubernetes.Interface
38
        platformHelpers platforms.Interface
39
}
40

41
func NewDrainer(platformHelpers platforms.Interface) (DrainInterface, error) {
1✔
42
        kclient, err := kubernetes.NewForConfig(vars.Config)
1✔
43
        if err != nil {
1✔
44
                return nil, err
×
45
        }
×
46

47
        return &Drainer{
1✔
48
                kubeClient:      kclient,
1✔
49
                platformHelpers: platformHelpers,
1✔
50
        }, err
1✔
51
}
52

53
// DrainNode the function cordon a node and drain pods from it
54
// if fullNodeDrain true all the pods on the system will get drained
55
// for openshift system we also pause the machine config pool this machine is part of it
56
func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrain, singleNode bool) (bool, error) {
1✔
57
        reqLogger := log.FromContext(ctx).WithValues("drain node", node.Name)
1✔
58
        reqLogger.Info("drainNode(): Node drain requested", "node", node.Name)
1✔
59

1✔
60
        completed, err := d.platformHelpers.OpenshiftBeforeDrainNode(ctx, node)
1✔
61
        if err != nil {
1✔
62
                reqLogger.Error(err, "error running OpenshiftDrainNode")
×
63
                return false, err
×
64
        }
×
65

66
        if !completed {
1✔
67
                reqLogger.Info("OpenshiftDrainNode did not finish re queue the node request")
×
68
                return false, nil
×
69
        }
×
70

71
        // Check if we are on a single node, and we require a reboot/full-drain we just return
72
        if fullNodeDrain && singleNode {
2✔
73
                return true, nil
1✔
74
        }
1✔
75

76
        drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain)
1✔
77
        backoff := wait.Backoff{
1✔
78
                Steps:    5,
1✔
79
                Duration: 10 * time.Second,
1✔
80
                Factor:   2,
1✔
81
        }
1✔
82
        var lastErr error
1✔
83

1✔
84
        reqLogger.Info("drainNode(): Start draining")
1✔
85
        if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
2✔
86
                err := drain.RunCordonOrUncordon(drainHelper, node, true)
1✔
87
                if err != nil {
1✔
UNCOV
88
                        lastErr = err
×
UNCOV
89
                        reqLogger.Info("drainNode(): Cordon failed, retrying", "error", err)
×
UNCOV
90
                        return false, nil
×
91
                }
×
92
                err = drain.RunNodeDrain(drainHelper, node.Name)
1✔
93
                if err == nil {
2✔
94
                        return true, nil
1✔
95
                }
1✔
96
                lastErr = err
×
97
                reqLogger.Info("drainNode(): Draining failed, retrying", "error", err)
×
98
                return false, nil
×
99
        }); err != nil {
×
UNCOV
100
                if wait.Interrupted(err) {
×
UNCOV
101
                        reqLogger.Info("drainNode(): failed to drain node", "steps", backoff.Steps, "error", lastErr)
×
UNCOV
102
                }
×
UNCOV
103
                reqLogger.Info("drainNode(): failed to drain node", "error", err)
×
UNCOV
104
                return false, err
×
105
        }
106
        reqLogger.Info("drainNode(): Drain completed")
1✔
107
        return true, nil
1✔
108
}
109

110
// CompleteDrainNode run un-cordon for the requested node
111
// for openshift system we also remove the pause from the machine config pool this node is part of
112
// only if we are the last draining node on that pool
113
func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (bool, error) {
1✔
114
        logger := log.FromContext(ctx)
1✔
115
        logger.Info("CompleteDrainNode:()")
1✔
116

1✔
117
        // Create drain helper object
1✔
118
        // full drain is not important here
1✔
119
        drainHelper := createDrainHelper(d.kubeClient, ctx, false)
1✔
120

1✔
121
        // run the un cordon function on the node
1✔
122
        if err := drain.RunCordonOrUncordon(drainHelper, node, false); err != nil {
1✔
UNCOV
123
                logger.Error(err, "failed to un-cordon the node")
×
UNCOV
124
                return false, err
×
UNCOV
125
        }
×
126

127
        // call the openshift complete drain to unpause the MCP
128
        // only if we are the last draining node in the pool
129
        completed, err := d.platformHelpers.OpenshiftAfterCompleteDrainNode(ctx, node)
1✔
130
        if err != nil {
1✔
UNCOV
131
                logger.Error(err, "failed to complete openshift draining")
×
UNCOV
132
                return false, err
×
UNCOV
133
        }
×
134

135
        logger.V(2).Info("CompleteDrainNode:()", "drainCompleted", completed)
1✔
136
        return completed, nil
1✔
137
}
138

139
// createDrainHelper function to create a drain helper
140
// if fullDrain is false we only remove pods that have the resourcePrefix
141
// if not we remove all the pods in the node
142
func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, fullDrain bool) *drain.Helper {
1✔
143
        logger := log.FromContext(ctx)
1✔
144
        drainer := &drain.Helper{
1✔
145
                Client:              kubeClient,
1✔
146
                Force:               true,
1✔
147
                IgnoreAllDaemonSets: true,
1✔
148
                DeleteEmptyDirData:  true,
1✔
149
                GracePeriodSeconds:  -1,
1✔
150
                Timeout:             90 * time.Second,
1✔
151
                OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
2✔
152
                        verbStr := constants.DrainDeleted
1✔
153
                        if usingEviction {
2✔
154
                                verbStr = constants.DrainEvicted
1✔
155
                        }
1✔
156
                        log.Log.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
1✔
157
                },
158
                Ctx:    ctx,
159
                Out:    writer{logger.Info},
160
                ErrOut: writer{func(msg string, kv ...interface{}) { logger.Error(nil, msg, kv...) }},
1✔
161
        }
162

163
        // when we just want to drain and not reboot we can only remove the pods using sriov devices
164
        if !fullDrain {
2✔
165
                deleteFunction := func(p corev1.Pod) drain.PodDeleteStatus {
1✔
166
                        for _, c := range p.Spec.Containers {
×
167
                                if c.Resources.Requests != nil {
×
168
                                        for r := range c.Resources.Requests {
×
169
                                                if strings.HasPrefix(r.String(), vars.ResourcePrefix) {
×
170
                                                        return drain.PodDeleteStatus{
×
UNCOV
171
                                                                Delete:  true,
×
UNCOV
172
                                                                Reason:  "pod contain SR-IOV device",
×
UNCOV
173
                                                                Message: "SR-IOV network operator draining the node",
×
174
                                                        }
×
UNCOV
175
                                                }
×
176
                                        }
177
                                }
178
                        }
UNCOV
179
                        return drain.PodDeleteStatus{Delete: false}
×
180
                }
181

182
                drainer.AdditionalFilters = []drain.PodFilter{deleteFunction}
1✔
183
        }
184

185
        return drainer
1✔
186
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc