• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 7913628099

15 Feb 2024 09:18AM UTC coverage: 30.287% (+0.1%) from 30.177%
7913628099

Pull #635

github

web-flow
Merge f9cf91565 into 1163ef9d1
Pull Request #635: Optmize refreshing SriovNetworkNodeState

9 of 16 new or added lines in 2 files covered. (56.25%)

3 existing lines in 1 file now uncovered.

3562 of 11761 relevant lines covered (30.29%)

0.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/writer.go
1
package daemon
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "os"
8
        "path/filepath"
9
        "reflect"
10
        "time"
11

12
        "github.com/pkg/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/util/wait"
15
        "k8s.io/client-go/util/retry"
16
        "sigs.k8s.io/controller-runtime/pkg/log"
17

18
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
19
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
20
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
21
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
22
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
23
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
24
)
25

26
const (
27
        CheckpointFileName = "sno-initial-node-state.json"
28
        Unknown            = "Unknown"
29
)
30

31
type NodeStateStatusWriter struct {
32
        client             snclientset.Interface
33
        status             sriovnetworkv1.SriovNetworkNodeStateStatus
34
        OnHeartbeatFailure func()
35
        platformHelper     platforms.Interface
36
        hostHelper         helper.HostHelpersInterface
37
        eventRecorder      *EventRecorder
38
}
39

40
// NewNodeStateStatusWriter Create a new NodeStateStatusWriter
41
func NewNodeStateStatusWriter(c snclientset.Interface,
42
        f func(), er *EventRecorder,
43
        hostHelper helper.HostHelpersInterface,
44
        platformHelper platforms.Interface) *NodeStateStatusWriter {
×
45
        return &NodeStateStatusWriter{
×
46
                client:             c,
×
47
                OnHeartbeatFailure: f,
×
48
                eventRecorder:      er,
×
49
                hostHelper:         hostHelper,
×
50
                platformHelper:     platformHelper,
×
51
        }
×
52
}
×
53

54
// RunOnce initial the interface status for both baremetal and virtual environments
55
func (w *NodeStateStatusWriter) RunOnce() error {
×
56
        log.Log.V(0).Info("RunOnce()")
×
57
        msg := Message{}
×
58

×
59
        if vars.PlatformType == consts.VirtualOpenStack {
×
60
                ns, err := w.getCheckPointNodeState()
×
61
                if err != nil {
×
62
                        return err
×
63
                }
×
64

65
                if ns == nil {
×
66
                        err = w.platformHelper.CreateOpenstackDevicesInfo()
×
67
                        if err != nil {
×
68
                                return err
×
69
                        }
×
70
                } else {
×
71
                        w.platformHelper.CreateOpenstackDevicesInfoFromNodeStatus(ns)
×
72
                }
×
73
        }
74

75
        log.Log.V(0).Info("RunOnce(): first poll for nic status")
×
76
        if err := w.pollNicStatus(); err != nil {
×
77
                log.Log.Error(err, "RunOnce(): first poll failed")
×
78
        }
×
79

80
        ns, err := w.setNodeStateStatus(msg)
×
81
        if err != nil {
×
82
                log.Log.Error(err, "RunOnce(): first writing to node status failed")
×
83
        }
×
84
        return w.writeCheckpointFile(ns)
×
85
}
86

87
// Run reads from the writer channel and sets the interface status. It will
88
// return if the stop channel is closed. Intended to be run via a goroutine.
89
func (w *NodeStateStatusWriter) Run(stop <-chan struct{}, refresh <-chan Message, syncCh chan<- struct{}) error {
×
90
        log.Log.V(0).Info("Run(): start writer")
×
91
        msg := Message{}
×
NEW
92
        lastInterfaces := w.status.Interfaces
×
93

×
94
        for {
×
95
                select {
×
96
                case <-stop:
×
97
                        log.Log.V(0).Info("Run(): stop writer")
×
98
                        return nil
×
99
                case msg = <-refresh:
×
100
                        log.Log.V(0).Info("Run(): refresh trigger")
×
101
                        if err := w.pollNicStatus(); err != nil {
×
102
                                continue
×
103
                        }
NEW
104
                        lastInterfaces = w.status.Interfaces
×
105
                        _, err := w.setNodeStateStatus(msg)
×
106
                        if err != nil {
×
107
                                log.Log.Error(err, "Run() refresh: writing to node status failed")
×
108
                        }
×
109
                        syncCh <- struct{}{}
×
110
                case <-time.After(30 * time.Second):
×
111
                        log.Log.V(2).Info("Run(): period refresh")
×
112
                        if err := w.pollNicStatus(); err != nil {
×
113
                                continue
×
114
                        }
NEW
115
                        if !reflect.DeepEqual(lastInterfaces, w.status.Interfaces) {
×
NEW
116
                                lastInterfaces = w.status.Interfaces
×
NEW
117
                                w.setNodeStateStatus(msg)
×
NEW
118
                        }
×
119
                }
120
        }
121
}
122

123
func (w *NodeStateStatusWriter) pollNicStatus() error {
×
124
        log.Log.V(2).Info("pollNicStatus()")
×
125
        var iface []sriovnetworkv1.InterfaceExt
×
126
        var err error
×
127

×
128
        if vars.PlatformType == consts.VirtualOpenStack {
×
129
                iface, err = w.platformHelper.DiscoverSriovDevicesVirtual()
×
130
        } else {
×
131
                iface, err = w.hostHelper.DiscoverSriovDevices(w.hostHelper)
×
132
        }
×
133
        if err != nil {
×
134
                return err
×
135
        }
×
136
        w.status.Interfaces = iface
×
137

×
138
        return nil
×
139
}
140

141
func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv1.SriovNetworkNodeState)) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
142
        var nodeState *sriovnetworkv1.SriovNetworkNodeState
×
143
        var oldStatus, newStatus, lastError string
×
144

×
145
        err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
×
146
                n, getErr := w.getNodeState()
×
147
                if getErr != nil {
×
148
                        return getErr
×
149
                }
×
150
                oldStatus = n.Status.SyncStatus
×
151

×
152
                // Call the status modifier.
×
153
                f(n)
×
154

×
155
                newStatus = n.Status.SyncStatus
×
156
                lastError = n.Status.LastSyncError
×
157

×
158
                var err error
×
159
                nodeState, err = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).UpdateStatus(context.Background(), n, metav1.UpdateOptions{})
×
160
                if err != nil {
×
161
                        log.Log.V(0).Error(err, "updateNodeStateStatusRetry(): fail to update the node status")
×
162
                }
×
163
                return err
×
164
        })
165
        if err != nil {
×
166
                // may be conflict if max retries were hit
×
167
                return nil, fmt.Errorf("unable to update node %v: %v", nodeState, err)
×
168
        }
×
169

170
        w.recordStatusChangeEvent(oldStatus, newStatus, lastError)
×
171

×
172
        return nodeState, nil
×
173
}
174

175
func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
176
        nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) {
×
177
                nodeState.Status.Interfaces = w.status.Interfaces
×
178
                if msg.lastSyncError != "" || msg.syncStatus == consts.SyncStatusSucceeded {
×
179
                        // clear lastSyncError when sync Succeeded
×
180
                        nodeState.Status.LastSyncError = msg.lastSyncError
×
181
                }
×
182
                nodeState.Status.SyncStatus = msg.syncStatus
×
183

×
184
                log.Log.V(0).Info("setNodeStateStatus(): status",
×
185
                        "sync-status", nodeState.Status.SyncStatus,
×
186
                        "last-sync-error", nodeState.Status.LastSyncError)
×
187
        })
188
        if err != nil {
×
189
                return nil, err
×
190
        }
×
191
        return nodeState, nil
×
192
}
193

194
// recordStatusChangeEvent sends event in case oldStatus differs from newStatus
195
func (w *NodeStateStatusWriter) recordStatusChangeEvent(oldStatus, newStatus, lastError string) {
×
196
        if oldStatus != newStatus {
×
197
                if oldStatus == "" {
×
198
                        oldStatus = Unknown
×
199
                }
×
200
                if newStatus == "" {
×
201
                        newStatus = Unknown
×
202
                }
×
203
                eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus)
×
204
                if lastError != "" {
×
205
                        eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, lastError)
×
206
                }
×
207
                w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg)
×
208
        }
209
}
210

211
// getNodeState queries the kube apiserver to get the SriovNetworkNodeState CR
212
func (w *NodeStateStatusWriter) getNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
213
        var lastErr error
×
214
        var n *sriovnetworkv1.SriovNetworkNodeState
×
215
        err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) {
×
NEW
216
                n, lastErr = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{ResourceVersion: "0"})
×
217
                if lastErr == nil {
×
218
                        return true, nil
×
219
                }
×
220
                log.Log.Error(lastErr, "getNodeState(): Failed to fetch node state, close all connections and retry...", "name", vars.NodeName)
×
221
                // Use the Get() also as an client-go keepalive indicator for the TCP connection.
×
222
                w.OnHeartbeatFailure()
×
223
                return false, nil
×
224
        })
225
        if err != nil {
×
226
                if err == wait.ErrWaitTimeout {
×
227
                        return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", vars.NodeName)
×
228
                }
×
229
                return nil, err
×
230
        }
231
        return n, nil
×
232
}
233

234
func (w *NodeStateStatusWriter) writeCheckpointFile(ns *sriovnetworkv1.SriovNetworkNodeState) error {
×
235
        configdir := filepath.Join(vars.Destdir, CheckpointFileName)
×
236
        file, err := os.OpenFile(configdir, os.O_RDWR|os.O_CREATE, 0644)
×
237
        if err != nil {
×
238
                return err
×
239
        }
×
240
        defer file.Close()
×
241
        log.Log.Info("writeCheckpointFile(): try to decode the checkpoint file")
×
242
        if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil {
×
243
                log.Log.V(2).Error(err, "writeCheckpointFile(): fail to decode, writing new file instead")
×
244
                log.Log.Info("writeCheckpointFile(): write checkpoint file")
×
245
                if err = file.Truncate(0); err != nil {
×
246
                        return err
×
247
                }
×
248
                if _, err = file.Seek(0, 0); err != nil {
×
249
                        return err
×
250
                }
×
251
                if err = json.NewEncoder(file).Encode(*ns); err != nil {
×
252
                        return err
×
253
                }
×
254
                sriovnetworkv1.InitialState = *ns
×
255
        }
256
        return nil
×
257
}
258

259
func (w *NodeStateStatusWriter) getCheckPointNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
260
        log.Log.Info("getCheckPointNodeState()")
×
261
        configdir := filepath.Join(vars.Destdir, CheckpointFileName)
×
262
        file, err := os.OpenFile(configdir, os.O_RDONLY, 0644)
×
263
        if err != nil {
×
264
                if os.IsNotExist(err) {
×
265
                        return nil, nil
×
266
                }
×
267
                return nil, err
×
268
        }
269
        defer file.Close()
×
270
        if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil {
×
271
                return nil, err
×
272
        }
×
273

274
        return &sriovnetworkv1.InitialState, nil
×
275
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc