• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 11229466077

08 Oct 2024 05:59AM UTC coverage: 45.063% (-0.1%) from 45.177%
11229466077

Pull #666

github

web-flow
Merge 60432e00c into aecf4730f
Pull Request #666: Implement RDMA subsystem mode change

84 of 189 new or added lines in 11 files covered. (44.44%)

2 existing lines in 1 file now uncovered.

6700 of 14868 relevant lines covered (45.06%)

0.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/writer.go
1
package daemon
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "os"
8
        "path/filepath"
9
        "time"
10

11
        "github.com/pkg/errors"
12
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
        "k8s.io/apimachinery/pkg/util/wait"
14
        "k8s.io/client-go/util/retry"
15
        "sigs.k8s.io/controller-runtime/pkg/log"
16

17
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
18
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
19
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
20
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
21
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
22
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
23
)
24

25
const (
26
        CheckpointFileName = "sno-initial-node-state.json"
27
        Unknown            = "Unknown"
28
)
29

30
type NodeStateStatusWriter struct {
31
        client             snclientset.Interface
32
        status             sriovnetworkv1.SriovNetworkNodeStateStatus
33
        OnHeartbeatFailure func()
34
        platformHelper     platforms.Interface
35
        hostHelper         helper.HostHelpersInterface
36
        eventRecorder      *EventRecorder
37
}
38

39
// NewNodeStateStatusWriter Create a new NodeStateStatusWriter
40
func NewNodeStateStatusWriter(c snclientset.Interface,
41
        f func(), er *EventRecorder,
42
        hostHelper helper.HostHelpersInterface,
43
        platformHelper platforms.Interface) *NodeStateStatusWriter {
×
44
        return &NodeStateStatusWriter{
×
45
                client:             c,
×
46
                OnHeartbeatFailure: f,
×
47
                eventRecorder:      er,
×
48
                hostHelper:         hostHelper,
×
49
                platformHelper:     platformHelper,
×
50
        }
×
51
}
×
52

53
// RunOnce initial the interface status for both baremetal and virtual environments
54
func (w *NodeStateStatusWriter) RunOnce() error {
×
55
        log.Log.V(0).Info("RunOnce()")
×
56
        msg := Message{}
×
57

×
58
        if vars.PlatformType == consts.VirtualOpenStack {
×
59
                ns, err := w.getCheckPointNodeState()
×
60
                if err != nil {
×
61
                        return err
×
62
                }
×
63

64
                if ns == nil {
×
65
                        err = w.platformHelper.CreateOpenstackDevicesInfo()
×
66
                        if err != nil {
×
67
                                return err
×
68
                        }
×
69
                } else {
×
70
                        w.platformHelper.CreateOpenstackDevicesInfoFromNodeStatus(ns)
×
71
                }
×
72
        }
73

74
        log.Log.V(0).Info("RunOnce(): first poll for nic status")
×
75
        if err := w.pollNicStatus(); err != nil {
×
76
                log.Log.Error(err, "RunOnce(): first poll failed")
×
77
        }
×
78

79
        ns, err := w.setNodeStateStatus(msg)
×
80
        if err != nil {
×
81
                log.Log.Error(err, "RunOnce(): first writing to node status failed")
×
82
        }
×
83
        return w.writeCheckpointFile(ns)
×
84
}
85

86
// Run reads from the writer channel and sets the interface status. It will
87
// return if the stop channel is closed. Intended to be run via a goroutine.
88
func (w *NodeStateStatusWriter) Run(stop <-chan struct{}, refresh <-chan Message, syncCh chan<- struct{}) error {
×
89
        log.Log.V(0).Info("Run(): start writer")
×
90
        msg := Message{}
×
91

×
92
        for {
×
93
                select {
×
94
                case <-stop:
×
95
                        log.Log.V(0).Info("Run(): stop writer")
×
96
                        return nil
×
97
                case msg = <-refresh:
×
98
                        log.Log.V(0).Info("Run(): refresh trigger")
×
99
                        if err := w.pollNicStatus(); err != nil {
×
100
                                continue
×
101
                        }
102
                        _, err := w.setNodeStateStatus(msg)
×
103
                        if err != nil {
×
104
                                log.Log.Error(err, "Run() refresh: writing to node status failed")
×
105
                        }
×
106
                        syncCh <- struct{}{}
×
107
                case <-time.After(30 * time.Second):
×
108
                        log.Log.V(2).Info("Run(): period refresh")
×
109
                        if err := w.pollNicStatus(); err != nil {
×
110
                                continue
×
111
                        }
112
                        w.setNodeStateStatus(msg)
×
113
                }
114
        }
115
}
116

117
func (w *NodeStateStatusWriter) pollNicStatus() error {
×
118
        log.Log.V(2).Info("pollNicStatus()")
×
119
        var iface []sriovnetworkv1.InterfaceExt
×
120
        var bridges sriovnetworkv1.Bridges
×
NEW
121
        var rdmaMode string
×
122
        var err error
×
123

×
124
        if vars.PlatformType == consts.VirtualOpenStack {
×
125
                iface, err = w.platformHelper.DiscoverSriovDevicesVirtual()
×
126
                if err != nil {
×
127
                        return err
×
128
                }
×
129
        } else {
×
130
                iface, err = w.hostHelper.DiscoverSriovDevices(w.hostHelper)
×
131
                if err != nil {
×
132
                        return err
×
133
                }
×
134
                if vars.ManageSoftwareBridges {
×
135
                        bridges, err = w.hostHelper.DiscoverBridges()
×
136
                        if err != nil {
×
137
                                return err
×
138
                        }
×
139
                }
140
        }
141

NEW
142
        rdmaMode, err = w.hostHelper.DiscoverRDMASubsystem()
×
NEW
143
        if err != nil {
×
NEW
144
                return err
×
NEW
145
        }
×
146

147
        w.status.Interfaces = iface
×
148
        w.status.Bridges = bridges
×
NEW
149
        w.status.System.RdmaMode = rdmaMode
×
150

×
151
        return nil
×
152
}
153

154
func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv1.SriovNetworkNodeState)) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
155
        var nodeState *sriovnetworkv1.SriovNetworkNodeState
×
156
        var oldStatus, newStatus, lastError string
×
157

×
158
        err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
×
159
                n, getErr := w.getNodeState()
×
160
                if getErr != nil {
×
161
                        return getErr
×
162
                }
×
163
                oldStatus = n.Status.SyncStatus
×
164

×
165
                // Call the status modifier.
×
166
                f(n)
×
167

×
168
                newStatus = n.Status.SyncStatus
×
169
                lastError = n.Status.LastSyncError
×
170

×
171
                var err error
×
172
                nodeState, err = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).UpdateStatus(context.Background(), n, metav1.UpdateOptions{})
×
173
                if err != nil {
×
174
                        log.Log.V(0).Error(err, "updateNodeStateStatusRetry(): fail to update the node status")
×
175
                }
×
176
                return err
×
177
        })
178
        if err != nil {
×
179
                // may be conflict if max retries were hit
×
180
                return nil, fmt.Errorf("unable to update node %v: %v", nodeState, err)
×
181
        }
×
182

183
        w.recordStatusChangeEvent(oldStatus, newStatus, lastError)
×
184

×
185
        return nodeState, nil
×
186
}
187

188
func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
189
        nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) {
×
190
                nodeState.Status.Interfaces = w.status.Interfaces
×
191
                nodeState.Status.Bridges = w.status.Bridges
×
192
                if msg.lastSyncError != "" || msg.syncStatus == consts.SyncStatusSucceeded {
×
193
                        // clear lastSyncError when sync Succeeded
×
194
                        nodeState.Status.LastSyncError = msg.lastSyncError
×
195
                }
×
196
                nodeState.Status.SyncStatus = msg.syncStatus
×
197

×
198
                log.Log.V(0).Info("setNodeStateStatus(): status",
×
199
                        "sync-status", nodeState.Status.SyncStatus,
×
200
                        "last-sync-error", nodeState.Status.LastSyncError)
×
201
        })
202
        if err != nil {
×
203
                return nil, err
×
204
        }
×
205
        return nodeState, nil
×
206
}
207

208
// recordStatusChangeEvent sends event in case oldStatus differs from newStatus
209
func (w *NodeStateStatusWriter) recordStatusChangeEvent(oldStatus, newStatus, lastError string) {
×
210
        if oldStatus != newStatus {
×
211
                if oldStatus == "" {
×
212
                        oldStatus = Unknown
×
213
                }
×
214
                if newStatus == "" {
×
215
                        newStatus = Unknown
×
216
                }
×
217
                eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus)
×
218
                if lastError != "" {
×
219
                        eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, lastError)
×
220
                }
×
221
                w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg)
×
222
        }
223
}
224

225
// getNodeState queries the kube apiserver to get the SriovNetworkNodeState CR
226
func (w *NodeStateStatusWriter) getNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
227
        var lastErr error
×
228
        var n *sriovnetworkv1.SriovNetworkNodeState
×
229
        err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) {
×
230
                n, lastErr = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
×
231
                if lastErr == nil {
×
232
                        return true, nil
×
233
                }
×
234
                log.Log.Error(lastErr, "getNodeState(): Failed to fetch node state, close all connections and retry...", "name", vars.NodeName)
×
235
                // Use the Get() also as an client-go keepalive indicator for the TCP connection.
×
236
                w.OnHeartbeatFailure()
×
237
                return false, nil
×
238
        })
239
        if err != nil {
×
240
                if err == wait.ErrWaitTimeout {
×
241
                        return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", vars.NodeName)
×
242
                }
×
243
                return nil, err
×
244
        }
245
        return n, nil
×
246
}
247

248
func (w *NodeStateStatusWriter) writeCheckpointFile(ns *sriovnetworkv1.SriovNetworkNodeState) error {
×
249
        configdir := filepath.Join(vars.Destdir, CheckpointFileName)
×
250
        file, err := os.OpenFile(configdir, os.O_RDWR|os.O_CREATE, 0644)
×
251
        if err != nil {
×
252
                return err
×
253
        }
×
254
        defer file.Close()
×
255
        log.Log.Info("writeCheckpointFile(): try to decode the checkpoint file")
×
256
        if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil {
×
257
                log.Log.V(2).Error(err, "writeCheckpointFile(): fail to decode, writing new file instead")
×
258
                log.Log.Info("writeCheckpointFile(): write checkpoint file")
×
259
                if err = file.Truncate(0); err != nil {
×
260
                        return err
×
261
                }
×
262
                if _, err = file.Seek(0, 0); err != nil {
×
263
                        return err
×
264
                }
×
265
                if err = json.NewEncoder(file).Encode(*ns); err != nil {
×
266
                        return err
×
267
                }
×
268
                sriovnetworkv1.InitialState = *ns
×
269
        }
270
        return nil
×
271
}
272

273
func (w *NodeStateStatusWriter) getCheckPointNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
274
        log.Log.Info("getCheckPointNodeState()")
×
275
        configdir := filepath.Join(vars.Destdir, CheckpointFileName)
×
276
        file, err := os.OpenFile(configdir, os.O_RDONLY, 0644)
×
277
        if err != nil {
×
278
                if os.IsNotExist(err) {
×
279
                        return nil, nil
×
280
                }
×
281
                return nil, err
×
282
        }
283
        defer file.Close()
×
284
        if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil {
×
285
                return nil, err
×
286
        }
×
287

288
        return &sriovnetworkv1.InitialState, nil
×
289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc