• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 3751025296

pending completion
3751025296

Pull #365

github

GitHub
Merge 421284b55 into 788d76f7e
Pull Request #365: Implementation for new systemd configuration method

958 of 958 new or added lines in 18 files covered. (100.0%)

1971 of 8330 relevant lines covered (23.66%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/writer.go
1
package daemon
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "os"
8
        "path/filepath"
9
        "time"
10

11
        "github.com/golang/glog"
12
        "github.com/pkg/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/util/wait"
15
        "k8s.io/client-go/util/retry"
16

17
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
18
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
19
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
20
)
21

22
const (
23
        CheckpointFileName = "sno-initial-node-state.json"
24
)
25

26
type NodeStateStatusWriter struct {
27
        client                 snclientset.Interface
28
        node                   string
29
        status                 sriovnetworkv1.SriovNetworkNodeStateStatus
30
        OnHeartbeatFailure     func()
31
        openStackDevicesInfo   utils.OSPDevicesInfo
32
        withUnsupportedDevices bool
33
}
34

35
// NewNodeStateStatusWriter Create a new NodeStateStatusWriter
36
func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), devMode bool) *NodeStateStatusWriter {
×
37
        return &NodeStateStatusWriter{
×
38
                client:                 c,
×
39
                node:                   n,
×
40
                OnHeartbeatFailure:     f,
×
41
                withUnsupportedDevices: devMode,
×
42
        }
×
43
}
×
44

45
// RunOnce initial the interface status for both baremetal and virtual environments
46
func (w *NodeStateStatusWriter) RunOnce(destDir string, platformType utils.PlatformType) error {
×
47
        glog.V(0).Infof("RunOnce()")
×
48
        msg := Message{}
×
49

×
50
        if platformType == utils.VirtualOpenStack {
×
51
                ns, err := w.getCheckPointNodeState(destDir)
×
52
                if err != nil {
×
53
                        return err
×
54
                }
×
55

56
                if ns == nil {
×
57
                        metaData, networkData, err := utils.GetOpenstackData(true)
×
58
                        if err != nil {
×
59
                                glog.Errorf("RunOnce(): failed to read OpenStack data: %v", err)
×
60
                        }
×
61

62
                        w.openStackDevicesInfo, err = utils.CreateOpenstackDevicesInfo(metaData, networkData)
×
63
                        if err != nil {
×
64
                                return err
×
65
                        }
×
66
                } else {
×
67
                        w.openStackDevicesInfo = utils.CreateOpenstackDevicesInfoFromNodeStatus(ns)
×
68
                }
×
69
        }
70

71
        glog.V(0).Info("RunOnce(): first poll for nic status")
×
72
        if err := w.pollNicStatus(platformType); err != nil {
×
73
                glog.Errorf("RunOnce(): first poll failed: %v", err)
×
74
        }
×
75

76
        ns, err := w.setNodeStateStatus(msg)
×
77
        if err != nil {
×
78
                glog.Errorf("RunOnce(): first writing to node status failed: %v", err)
×
79
        }
×
80
        return w.writeCheckpointFile(ns, destDir)
×
81
}
82

83
// Run reads from the writer channel and sets the interface status. It will
84
// return if the stop channel is closed. Intended to be run via a goroutine.
85
func (w *NodeStateStatusWriter) Run(stop <-chan struct{}, refresh <-chan Message, syncCh chan<- struct{}, platformType utils.PlatformType) error {
×
86
        glog.V(0).Infof("Run(): start writer")
×
87
        msg := Message{}
×
88

×
89
        for {
×
90
                select {
×
91
                case <-stop:
×
92
                        glog.V(0).Info("Run(): stop writer")
×
93
                        return nil
×
94
                case msg = <-refresh:
×
95
                        glog.V(0).Info("Run(): refresh trigger")
×
96
                        if err := w.pollNicStatus(platformType); err != nil {
×
97
                                continue
×
98
                        }
99
                        _, err := w.setNodeStateStatus(msg)
×
100
                        if err != nil {
×
101
                                glog.Errorf("Run() refresh: writing to node status failed: %v", err)
×
102
                        }
×
103

104
                        if msg.syncStatus == syncStatusSucceeded || msg.syncStatus == syncStatusFailed {
×
105
                                syncCh <- struct{}{}
×
106
                        }
×
107
                        //TODO: check if we need this at all
108
                        //case <-time.After(30 * time.Second):
109
                        //        glog.V(2).Info("Run(): period refresh")
110
                        //        if err := w.pollNicStatus(platformType); err != nil {
111
                        //                continue
112
                        //        }
113
                        //        w.setNodeStateStatus(msg)
114
                }
115
        }
116
}
117

118
func (w *NodeStateStatusWriter) pollNicStatus(platformType utils.PlatformType) error {
×
119
        glog.V(2).Info("pollNicStatus()")
×
120
        var iface []sriovnetworkv1.InterfaceExt
×
121
        var err error
×
122

×
123
        if platformType == utils.VirtualOpenStack {
×
124
                iface, err = utils.DiscoverSriovDevicesVirtual(w.openStackDevicesInfo)
×
125
        } else {
×
126
                iface, err = utils.DiscoverSriovDevices(w.withUnsupportedDevices)
×
127
        }
×
128
        if err != nil {
×
129
                return err
×
130
        }
×
131
        w.status.Interfaces = iface
×
132

×
133
        return nil
×
134
}
135

136
func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv1.SriovNetworkNodeState)) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
137
        var nodeState *sriovnetworkv1.SriovNetworkNodeState
×
138
        err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
×
139
                n, getErr := w.getNodeState()
×
140
                if getErr != nil {
×
141
                        return getErr
×
142
                }
×
143

144
                // Call the status modifier.
145
                f(n)
×
146

×
147
                var err error
×
148
                nodeState, err = w.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).UpdateStatus(context.Background(), n, metav1.UpdateOptions{})
×
149
                if err != nil {
×
150
                        glog.V(0).Infof("updateNodeStateStatusRetry(): fail to update the node status: %v", err)
×
151
                }
×
152
                return err
×
153
        })
154
        if err != nil {
×
155
                // may be conflict if max retries were hit
×
156
                return nil, fmt.Errorf("unable to update node %v: %v", nodeState, err)
×
157
        }
×
158

159
        return nodeState, nil
×
160
}
161

162
func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
163
        nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) {
×
164
                nodeState.Status.Interfaces = w.status.Interfaces
×
165
                if msg.lastSyncError != "" || msg.syncStatus == syncStatusSucceeded {
×
166
                        // clear lastSyncError when sync Succeeded
×
167
                        nodeState.Status.LastSyncError = msg.lastSyncError
×
168
                }
×
169
                nodeState.Status.SyncStatus = msg.syncStatus
×
170

×
171
                glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)
×
172
        })
173
        if err != nil {
×
174
                return nil, err
×
175
        }
×
176
        return nodeState, nil
×
177
}
178

179
// getNodeState queries the kube apiserver to get the SriovNetworkNodeState CR
180
func (w *NodeStateStatusWriter) getNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
181
        var lastErr error
×
182
        var n *sriovnetworkv1.SriovNetworkNodeState
×
183
        err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) {
×
184
                n, lastErr = w.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).Get(context.Background(), w.node, metav1.GetOptions{})
×
185
                if lastErr == nil {
×
186
                        return true, nil
×
187
                }
×
188
                glog.Warningf("getNodeState(): Failed to fetch node state %s (%v); close all connections and retry...", w.node, lastErr)
×
189
                // Use the Get() also as an client-go keepalive indicator for the TCP connection.
×
190
                w.OnHeartbeatFailure()
×
191
                return false, nil
×
192
        })
193
        if err != nil {
×
194
                if err == wait.ErrWaitTimeout {
×
195
                        return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", w.node)
×
196
                }
×
197
                return nil, err
×
198
        }
199
        return n, nil
×
200
}
201

202
func (w *NodeStateStatusWriter) writeCheckpointFile(ns *sriovnetworkv1.SriovNetworkNodeState, destDir string) error {
×
203
        configdir := filepath.Join(destDir, CheckpointFileName)
×
204
        file, err := os.OpenFile(configdir, os.O_RDWR|os.O_CREATE, 0644)
×
205
        if err != nil {
×
206
                return err
×
207
        }
×
208
        defer file.Close()
×
209
        glog.Info("writeCheckpointFile(): try to decode the checkpoint file")
×
210
        if err = json.NewDecoder(file).Decode(&utils.InitialState); err != nil {
×
211
                glog.V(2).Infof("writeCheckpointFile(): fail to decode: %v", err)
×
212
                glog.Info("writeCheckpointFile(): write checkpoint file")
×
213
                if err = file.Truncate(0); err != nil {
×
214
                        return err
×
215
                }
×
216
                if _, err = file.Seek(0, 0); err != nil {
×
217
                        return err
×
218
                }
×
219
                if err = json.NewEncoder(file).Encode(*ns); err != nil {
×
220
                        return err
×
221
                }
×
222
                utils.InitialState = *ns
×
223
        }
224
        return nil
×
225
}
226

227
func (w *NodeStateStatusWriter) getCheckPointNodeState(destDir string) (*sriovnetworkv1.SriovNetworkNodeState, error) {
×
228
        glog.Infof("getCheckPointNodeState()")
×
229
        configdir := filepath.Join(destDir, CheckpointFileName)
×
230
        file, err := os.OpenFile(configdir, os.O_RDONLY, 0644)
×
231
        if err != nil {
×
232
                if os.IsNotExist(err) {
×
233
                        return nil, nil
×
234
                }
×
235
                return nil, err
×
236
        }
237
        defer file.Close()
×
238
        if err = json.NewDecoder(file).Decode(&utils.InitialState); err != nil {
×
239
                return nil, err
×
240
        }
×
241

242
        return &utils.InitialState, nil
×
243
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc