• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 11229466077

08 Oct 2024 05:59AM UTC coverage: 45.063% (-0.1%) from 45.177%
11229466077

Pull #666

github

web-flow
Merge 60432e00c into aecf4730f
Pull Request #666: Implement RDMA subsystem mode change

84 of 189 new or added lines in 11 files covered. (44.44%)

2 existing lines in 1 file now uncovered.

6700 of 14868 relevant lines covered (45.06%)

0.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.15
/pkg/daemon/daemon.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "math/rand"
7
        "os/exec"
8
        "reflect"
9
        "sync"
10
        "time"
11

12
        "golang.org/x/time/rate"
13
        "k8s.io/apimachinery/pkg/api/errors"
14
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        "k8s.io/client-go/kubernetes"
18
        "k8s.io/client-go/tools/cache"
19
        "k8s.io/client-go/util/workqueue"
20
        "sigs.k8s.io/controller-runtime/pkg/client"
21
        "sigs.k8s.io/controller-runtime/pkg/log"
22

23
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
24
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
25
        sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
26
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
27
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate"
28
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
29
        snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
30
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
31
        plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins"
32
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd"
33
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
34
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
35
)
36

37
const (
38
        // updateDelay is the baseline speed at which we react to changes.  We don't
39
        // need to react in milliseconds as any change would involve rebooting the node.
40
        updateDelay = 5 * time.Second
41
        // maxUpdateBackoff is the maximum time to react to a change as we back off
42
        // in the face of errors.
43
        maxUpdateBackoff = 60 * time.Second
44
)
45

46
type Message struct {
47
        syncStatus    string
48
        lastSyncError string
49
}
50

51
type Daemon struct {
52
        client client.Client
53

54
        sriovClient snclientset.Interface
55
        // kubeClient allows interaction with Kubernetes, including the node we are running on.
56
        kubeClient kubernetes.Interface
57

58
        desiredNodeState *sriovnetworkv1.SriovNetworkNodeState
59
        currentNodeState *sriovnetworkv1.SriovNetworkNodeState
60

61
        // list of disabled plugins
62
        disabledPlugins []string
63

64
        loadedPlugins map[string]plugin.VendorPlugin
65

66
        HostHelpers helper.HostHelpersInterface
67

68
        platformHelpers platforms.Interface
69

70
        // channel used by callbacks to signal Run() of an error
71
        exitCh chan<- error
72

73
        // channel used to ensure all spawned goroutines exit when we exit.
74
        stopCh <-chan struct{}
75

76
        syncCh <-chan struct{}
77

78
        refreshCh chan<- Message
79

80
        mu *sync.Mutex
81

82
        disableDrain bool
83

84
        workqueue workqueue.RateLimitingInterface
85

86
        eventRecorder *EventRecorder
87

88
        featureGate featuregate.FeatureGate
89
}
90

91
func New(
92
        client client.Client,
93
        sriovClient snclientset.Interface,
94
        kubeClient kubernetes.Interface,
95
        hostHelpers helper.HostHelpersInterface,
96
        platformHelper platforms.Interface,
97
        exitCh chan<- error,
98
        stopCh <-chan struct{},
99
        syncCh <-chan struct{},
100
        refreshCh chan<- Message,
101
        er *EventRecorder,
102
        featureGates featuregate.FeatureGate,
103
        disabledPlugins []string,
104
) *Daemon {
1✔
105
        return &Daemon{
1✔
106
                client:           client,
1✔
107
                sriovClient:      sriovClient,
1✔
108
                kubeClient:       kubeClient,
1✔
109
                HostHelpers:      hostHelpers,
1✔
110
                platformHelpers:  platformHelper,
1✔
111
                exitCh:           exitCh,
1✔
112
                stopCh:           stopCh,
1✔
113
                syncCh:           syncCh,
1✔
114
                refreshCh:        refreshCh,
1✔
115
                desiredNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
116
                currentNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
117
                workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
1✔
118
                        &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
1✔
119
                        workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
1✔
120
                eventRecorder:   er,
1✔
121
                featureGate:     featureGates,
1✔
122
                disabledPlugins: disabledPlugins,
1✔
123
        }
1✔
124
}
1✔
125

126
// Run the config daemon
127
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
1✔
128
        log.Log.V(0).Info("Run()", "node", vars.NodeName)
1✔
129

1✔
130
        if vars.ClusterType == consts.ClusterTypeOpenshift {
1✔
131
                log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor())
×
132
        } else {
1✔
133
                log.Log.V(0).Info("Run(): start daemon.")
1✔
134
        }
1✔
135

136
        if !vars.UsingSystemdMode {
2✔
137
                log.Log.V(0).Info("Run(): daemon running in daemon mode")
1✔
138
                dn.HostHelpers.CheckRDMAEnabled()
1✔
139
                dn.HostHelpers.TryEnableTun()
1✔
140
                dn.HostHelpers.TryEnableVhostNet()
1✔
141
                err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift)
1✔
142
                if err != nil {
1✔
143
                        log.Log.Error(err, "failed to remove all the systemd sriov files")
×
144
                }
×
145
        } else {
×
146
                log.Log.V(0).Info("Run(): daemon running in systemd mode")
×
147
        }
×
148

149
        // Only watch own SriovNetworkNodeState CR
150
        defer utilruntime.HandleCrash()
1✔
151
        defer dn.workqueue.ShutDown()
1✔
152

1✔
153
        if err := dn.prepareNMUdevRule(); err != nil {
1✔
154
                log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs")
×
155
        }
×
156
        if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil {
1✔
157
                log.Log.Error(err, "failed to prepare udev files to rename VF representors for requested VFs")
×
158
        }
×
159

160
        var timeout int64 = 5
1✔
161
        var metadataKey = "metadata.name"
1✔
162
        dn.mu = &sync.Mutex{}
1✔
163
        informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
164
                time.Second*15,
1✔
165
                vars.Namespace,
1✔
166
                func(lo *metav1.ListOptions) {
2✔
167
                        lo.FieldSelector = metadataKey + "=" + vars.NodeName
1✔
168
                        lo.TimeoutSeconds = &timeout
1✔
169
                },
1✔
170
        )
171

172
        informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer()
1✔
173
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
174
                AddFunc: dn.enqueueNodeState,
1✔
175
                UpdateFunc: func(old, new interface{}) {
1✔
176
                        dn.enqueueNodeState(new)
×
177
                },
×
178
        })
179

180
        cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
181
                time.Second*30,
1✔
182
                vars.Namespace,
1✔
183
                func(lo *metav1.ListOptions) {
2✔
184
                        lo.FieldSelector = metadataKey + "=" + "default"
1✔
185
                },
1✔
186
        )
187

188
        cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer()
1✔
189
        cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
190
                AddFunc:    dn.operatorConfigAddHandler,
1✔
191
                UpdateFunc: dn.operatorConfigChangeHandler,
1✔
192
        })
1✔
193

1✔
194
        rand.Seed(time.Now().UnixNano())
1✔
195
        go cfgInformer.Run(dn.stopCh)
1✔
196
        time.Sleep(5 * time.Second)
1✔
197
        go informer.Run(dn.stopCh)
1✔
198
        if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, informer.HasSynced); !ok {
1✔
199
                return fmt.Errorf("failed to wait for caches to sync")
×
200
        }
×
201

202
        log.Log.Info("Starting workers")
1✔
203
        // Launch one worker to process
1✔
204
        go wait.Until(dn.runWorker, time.Second, stopCh)
1✔
205
        log.Log.Info("Started workers")
1✔
206

1✔
207
        for {
2✔
208
                select {
1✔
209
                case <-stopCh:
1✔
210
                        log.Log.V(0).Info("Run(): stop daemon")
1✔
211
                        return nil
1✔
212
                case err, more := <-exitCh:
×
213
                        log.Log.Error(err, "got an error")
×
214
                        if more {
×
215
                                dn.refreshCh <- Message{
×
216
                                        syncStatus:    consts.SyncStatusFailed,
×
217
                                        lastSyncError: err.Error(),
×
218
                                }
×
219
                        }
×
220
                        return err
×
221
                }
222
        }
223
}
224

225
func (dn *Daemon) runWorker() {
1✔
226
        for dn.processNextWorkItem() {
2✔
227
        }
1✔
228
}
229

230
func (dn *Daemon) enqueueNodeState(obj interface{}) {
1✔
231
        var ns *sriovnetworkv1.SriovNetworkNodeState
1✔
232
        var ok bool
1✔
233
        if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok {
1✔
234
                utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj))
×
235
                return
×
236
        }
×
237
        key := ns.GetGeneration()
1✔
238
        dn.workqueue.Add(key)
1✔
239
}
240

241
func (dn *Daemon) processNextWorkItem() bool {
1✔
242
        log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len())
1✔
243
        obj, shutdown := dn.workqueue.Get()
1✔
244
        if shutdown {
2✔
245
                return false
1✔
246
        }
1✔
247

248
        log.Log.V(2).Info("get item from queue", "item", obj.(int64))
1✔
249

1✔
250
        // We wrap this block in a func so we can defer c.workqueue.Done.
1✔
251
        err := func(obj interface{}) error {
2✔
252
                // We call Done here so the workqueue knows we have finished
1✔
253
                // processing this item.
1✔
254
                defer dn.workqueue.Done(obj)
1✔
255
                var key int64
1✔
256
                var ok bool
1✔
257
                if key, ok = obj.(int64); !ok {
1✔
258
                        // As the item in the workqueue is actually invalid, we call
×
259
                        // Forget here.
×
260
                        dn.workqueue.Forget(obj)
×
261
                        utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj))
×
262
                        return nil
×
263
                }
×
264

265
                err := dn.nodeStateSyncHandler()
1✔
266
                if err != nil {
1✔
267
                        // Ereport error message, and put the item back to work queue for retry.
×
268
                        dn.refreshCh <- Message{
×
269
                                syncStatus:    consts.SyncStatusFailed,
×
270
                                lastSyncError: err.Error(),
×
271
                        }
×
272
                        <-dn.syncCh
×
273
                        dn.workqueue.AddRateLimited(key)
×
274
                        return fmt.Errorf("error syncing: %s, requeuing", err.Error())
×
275
                }
×
276
                // Finally, if no error occurs we Forget this item so it does not
277
                // get queued again until another change happens.
278
                dn.workqueue.Forget(obj)
1✔
279
                log.Log.Info("Successfully synced")
1✔
280
                return nil
1✔
281
        }(obj)
282

283
        if err != nil {
1✔
284
                utilruntime.HandleError(err)
×
285
        }
×
286

287
        return true
1✔
288
}
289

290
func (dn *Daemon) operatorConfigAddHandler(obj interface{}) {
×
291
        dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj)
×
292
}
×
293

294
func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) {
×
295
        oldCfg := old.(*sriovnetworkv1.SriovOperatorConfig)
×
296
        newCfg := new.(*sriovnetworkv1.SriovOperatorConfig)
×
297
        if newCfg.Namespace != vars.Namespace || newCfg.Name != consts.DefaultConfigName {
×
298
                log.Log.V(2).Info("unsupported SriovOperatorConfig", "namespace", newCfg.Namespace, "name", newCfg.Name)
×
299
                return
×
300
        }
×
301

302
        snolog.SetLogLevel(newCfg.Spec.LogLevel)
×
303

×
304
        newDisableDrain := newCfg.Spec.DisableDrain
×
305
        if dn.disableDrain != newDisableDrain {
×
306
                dn.disableDrain = newDisableDrain
×
307
                log.Log.Info("Set Disable Drain", "value", dn.disableDrain)
×
308
        }
×
309

310
        if !reflect.DeepEqual(oldCfg.Spec.FeatureGates, newCfg.Spec.FeatureGates) {
×
311
                dn.featureGate.Init(newCfg.Spec.FeatureGates)
×
312
                log.Log.Info("Updated featureGates", "featureGates", dn.featureGate.String())
×
313
        }
×
314

315
        vars.MlxPluginFwReset = dn.featureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate)
×
316
}
317

318
func (dn *Daemon) nodeStateSyncHandler() error {
1✔
319
        var err error
1✔
320
        // Get the latest NodeState
1✔
321
        var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""}
1✔
322
        dn.desiredNodeState, err = dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
323
        if err != nil {
1✔
324
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
325
                return err
×
326
        }
×
327
        latest := dn.desiredNodeState.GetGeneration()
1✔
328
        log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest)
1✔
329

1✔
330
        // load plugins if it has not loaded
1✔
331
        if len(dn.loadedPlugins) == 0 {
1✔
332
                dn.loadedPlugins, err = loadPlugins(dn.desiredNodeState, dn.HostHelpers, dn.disabledPlugins)
×
333
                if err != nil {
×
334
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins")
×
335
                        return err
×
336
                }
×
337
        }
338

339
        skipReconciliation := true
1✔
340
        // if the operator complete the drain operator we should continue the configuration
1✔
341
        if !dn.isDrainCompleted() {
2✔
342
                if vars.UsingSystemdMode && dn.currentNodeState.GetGeneration() == latest {
1✔
343
                        serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath)
×
344
                        if err != nil {
×
345
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host")
×
346
                                return err
×
347
                        }
×
348
                        postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath)
×
349
                        if err != nil {
×
350
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host")
×
351
                                return err
×
352
                        }
×
353

354
                        // if the service doesn't exist we should continue to let the k8s plugin to create the service files
355
                        // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply
356
                        // the system service and reboot the node the config-daemon doesn't need to do anything.
357
                        if !(serviceEnabled && postNetworkServiceEnabled) {
×
358
                                sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed,
×
359
                                        LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+
×
360
                                                "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)}
×
361
                        } else {
×
362
                                sriovResult, err = systemd.ReadSriovResult()
×
363
                                if err != nil {
×
364
                                        log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host")
×
365
                                        return err
×
366
                                }
×
367
                        }
368
                        if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed {
×
369
                                log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError)
×
370

×
371
                                // add the error but don't requeue
×
372
                                dn.refreshCh <- Message{
×
373
                                        syncStatus:    consts.SyncStatusFailed,
×
374
                                        lastSyncError: sriovResult.LastSyncError,
×
375
                                }
×
376
                                <-dn.syncCh
×
377
                                return nil
×
378
                        }
×
379
                }
380

381
                skipReconciliation, err = dn.shouldSkipReconciliation(dn.desiredNodeState)
1✔
382
                if err != nil {
1✔
383
                        return err
×
384
                }
×
385
        }
386

387
        // we are done with the configuration just return here
388
        if dn.currentNodeState.GetGeneration() == dn.desiredNodeState.GetGeneration() &&
1✔
389
                dn.desiredNodeState.Status.SyncStatus == consts.SyncStatusSucceeded && skipReconciliation {
1✔
390
                log.Log.Info("Current state and desire state are equal together with sync status succeeded nothing to do")
×
391
                return nil
×
392
        }
×
393

394
        dn.refreshCh <- Message{
1✔
395
                syncStatus:    consts.SyncStatusInProgress,
1✔
396
                lastSyncError: "",
1✔
397
        }
1✔
398
        // wait for writer to refresh status then pull again the latest node state
1✔
399
        <-dn.syncCh
1✔
400

1✔
401
        // we need to load the latest status to our object
1✔
402
        // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't
1✔
403
        // trigger the refresh
1✔
404
        updatedState, err := dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
405
        if err != nil {
1✔
406
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
407
                return err
×
408
        }
×
409
        dn.desiredNodeState.Status = updatedState.Status
1✔
410

1✔
411
        reqReboot := false
1✔
412
        reqDrain := false
1✔
413

1✔
414
        // check if any of the plugins required to drain or reboot the node
1✔
415
        for k, p := range dn.loadedPlugins {
2✔
416
                d, r := false, false
1✔
417
                if dn.currentNodeState.GetName() == "" {
2✔
418
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state")
1✔
419
                } else {
1✔
420
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state")
×
421
                }
×
422
                d, r, err = p.OnNodeStateChange(dn.desiredNodeState)
1✔
423
                if err != nil {
1✔
424
                        log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k)
×
425
                        return err
×
426
                }
×
427
                log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r)
1✔
428
                reqDrain = reqDrain || d
1✔
429
                reqReboot = reqReboot || r
1✔
430
        }
431

432
        if dn.currentNodeState.Status.System.RdmaMode != dn.desiredNodeState.Spec.System.RdmaMode {
1✔
NEW
433
                err = dn.HostHelpers.SetRDMASubsystem(dn.desiredNodeState.Spec.System.RdmaMode)
×
NEW
434
                if err != nil {
×
NEW
435
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to set RDMA subsystem")
×
NEW
436
                        return err
×
NEW
437
                }
×
NEW
438
                reqReboot = true
×
NEW
439
                reqDrain = true
×
440
        }
441

442
        // When running using systemd check if the applied configuration is the latest one
443
        // or there is a new config we need to apply
444
        // When using systemd configuration we write the file
445
        if vars.UsingSystemdMode {
1✔
446
                log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host")
×
447
                systemdConfModified, err := systemd.WriteConfFile(dn.desiredNodeState)
×
448
                if err != nil {
×
449
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode")
×
450
                        return err
×
451
                }
×
452
                if systemdConfModified {
×
453
                        // remove existing result file to make sure that we will not use outdated result, e.g. in case if
×
454
                        // systemd service was not triggered for some reason
×
455
                        err = systemd.RemoveSriovResult()
×
456
                        if err != nil {
×
457
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode")
×
458
                                return err
×
459
                        }
×
460
                }
461
                reqDrain = reqDrain || systemdConfModified
×
462
                // require reboot if drain needed for systemd mode
×
463
                reqReboot = reqReboot || systemdConfModified || reqDrain
×
464
                log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results",
×
465
                        "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
×
466

×
467
                err = systemd.WriteSriovSupportedNics()
×
468
                if err != nil {
×
469
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode")
×
470
                        return err
×
471
                }
×
472
        }
473

474
        log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon",
1✔
475
                "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
1✔
476

1✔
477
        // handle drain only if the plugin request drain, or we are already in a draining request state
1✔
478
        if reqDrain || !utils.ObjectHasAnnotation(dn.desiredNodeState,
1✔
479
                consts.NodeStateDrainAnnotationCurrent,
1✔
480
                consts.DrainIdle) {
1✔
481
                drainInProcess, err := dn.handleDrain(reqReboot)
×
482
                if err != nil {
×
483
                        log.Log.Error(err, "failed to handle drain")
×
484
                        return err
×
485
                }
×
486
                if drainInProcess {
×
487
                        return nil
×
488
                }
×
489
        }
490

491
        // apply the vendor plugins after we are done with drain if needed
492
        for k, p := range dn.loadedPlugins {
2✔
493
                // Skip both the general and virtual plugin apply them last
1✔
494
                if k != GenericPluginName && k != VirtualPluginName {
1✔
495
                        err := p.Apply()
×
496
                        if err != nil {
×
497
                                log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k)
×
498
                                return err
×
499
                        }
×
500
                }
501
        }
502

503
        // if we don't need to reboot, or we are not doing the configuration in systemd
504
        // we apply the generic plugin
505
        if !reqReboot && !vars.UsingSystemdMode {
2✔
506
                // For BareMetal machines apply the generic plugin
1✔
507
                selectedPlugin, ok := dn.loadedPlugins[GenericPluginName]
1✔
508
                if ok {
2✔
509
                        // Apply generic plugin last
1✔
510
                        err = selectedPlugin.Apply()
1✔
511
                        if err != nil {
1✔
512
                                log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply")
×
513
                                return err
×
514
                        }
×
515
                }
516

517
                // For Virtual machines apply the virtual plugin
518
                selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName]
1✔
519
                if ok {
1✔
520
                        // Apply virtual plugin last
×
521
                        err = selectedPlugin.Apply()
×
522
                        if err != nil {
×
523
                                log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply")
×
524
                                return err
×
525
                        }
×
526
                }
527
        }
528

529
        if reqReboot {
1✔
530
                log.Log.Info("nodeStateSyncHandler(): reboot node")
×
531
                dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
×
532
                dn.rebootNode()
×
533
                return nil
×
534
        }
×
535

536
        // restart device plugin pod
537
        log.Log.Info("nodeStateSyncHandler(): restart device plugin pod")
1✔
538
        if err := dn.restartDevicePluginPod(); err != nil {
1✔
539
                log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod")
×
540
                return err
×
541
        }
×
542

543
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node")
1✔
544
        err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client)
1✔
545
        if err != nil {
1✔
546
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to annotate node")
×
547
                return err
×
548
        }
×
549

550
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for nodeState")
1✔
551
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
1✔
552
                consts.NodeStateDrainAnnotation,
1✔
553
                consts.DrainIdle, dn.client); err != nil {
1✔
554
                return err
×
555
        }
×
556

557
        log.Log.Info("nodeStateSyncHandler(): sync succeeded")
1✔
558
        dn.currentNodeState = dn.desiredNodeState.DeepCopy()
1✔
559
        if vars.UsingSystemdMode {
1✔
560
                dn.refreshCh <- Message{
×
561
                        syncStatus:    sriovResult.SyncStatus,
×
562
                        lastSyncError: sriovResult.LastSyncError,
×
563
                }
×
564
        } else {
1✔
565
                dn.refreshCh <- Message{
1✔
566
                        syncStatus:    consts.SyncStatusSucceeded,
1✔
567
                        lastSyncError: "",
1✔
568
                }
1✔
569
        }
1✔
570
        // wait for writer to refresh the status
571
        <-dn.syncCh
1✔
572
        return nil
1✔
573
}
574

575
func (dn *Daemon) shouldSkipReconciliation(latestState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) {
1✔
576
        log.Log.V(0).Info("shouldSkipReconciliation()")
1✔
577
        var err error
1✔
578

1✔
579
        // Skip when SriovNetworkNodeState object has just been created.
1✔
580
        if latestState.GetGeneration() == 1 && len(latestState.Spec.Interfaces) == 0 {
1✔
581
                err = dn.HostHelpers.ClearPCIAddressFolder()
×
582
                if err != nil {
×
583
                        log.Log.Error(err, "failed to clear the PCI address configuration")
×
584
                        return false, err
×
585
                }
×
586

587
                log.Log.V(0).Info(
×
588
                        "shouldSkipReconciliation(): interface policy spec not yet set by controller for sriovNetworkNodeState",
×
589
                        "name", latestState.Name)
×
590
                if latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
591
                        dn.refreshCh <- Message{
×
592
                                syncStatus:    consts.SyncStatusSucceeded,
×
593
                                lastSyncError: "",
×
594
                        }
×
595
                        // wait for writer to refresh status
×
596
                        <-dn.syncCh
×
597
                }
×
598
                return true, nil
×
599
        }
600

601
        // Verify changes in the status of the SriovNetworkNodeState CR.
602
        if dn.currentNodeState.GetGeneration() == latestState.GetGeneration() {
1✔
603
                log.Log.V(0).Info("shouldSkipReconciliation() verifying status change")
×
604
                for _, p := range dn.loadedPlugins {
×
605
                        // Verify changes in the status of the SriovNetworkNodeState CR.
×
606
                        log.Log.V(0).Info("shouldSkipReconciliation(): verifying status change for plugin", "pluginName", p.Name())
×
607
                        changed, err := p.CheckStatusChanges(latestState)
×
608
                        if err != nil {
×
609
                                return false, err
×
610
                        }
×
611
                        if changed {
×
612
                                log.Log.V(0).Info("shouldSkipReconciliation(): plugin require change", "pluginName", p.Name())
×
613
                                return false, nil
×
614
                        }
×
615
                }
616

617
                log.Log.V(0).Info("shouldSkipReconciliation(): Interface not changed")
×
618
                if latestState.Status.LastSyncError != "" ||
×
619
                        latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
620
                        dn.refreshCh <- Message{
×
621
                                syncStatus:    consts.SyncStatusSucceeded,
×
622
                                lastSyncError: "",
×
623
                        }
×
624
                        // wait for writer to refresh the status
×
625
                        <-dn.syncCh
×
626
                }
×
627

628
                return true, nil
×
629
        }
630

631
        return false, nil
1✔
632
}
633

634
// handleDrain: adds the right annotation to the node and nodeState object
635
// returns true if we need to finish the reconcile loop and wait for a new object
636
func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) {
×
637
        // done with the drain we can continue with the configuration
×
638
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) {
×
639
                log.Log.Info("handleDrain(): the node complete the draining")
×
640
                return false, nil
×
641
        }
×
642

643
        // the operator is still draining the node so we reconcile
644
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) {
×
645
                log.Log.Info("handleDrain(): the node is still draining")
×
646
                return true, nil
×
647
        }
×
648

649
        // drain is disabled we continue with the configuration
650
        if dn.disableDrain {
×
651
                log.Log.Info("handleDrain(): drain is disabled in sriovOperatorConfig")
×
652
                return false, nil
×
653
        }
×
654

655
        if reqReboot {
×
656
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for node")
×
657
                err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.RebootRequired, dn.client)
×
658
                if err != nil {
×
659
                        log.Log.Error(err, "applyDrainRequired(): Failed to annotate node")
×
660
                        return false, err
×
661
                }
×
662

663
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for nodeState")
×
664
                if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
665
                        consts.NodeStateDrainAnnotation,
×
666
                        consts.RebootRequired, dn.client); err != nil {
×
667
                        return false, err
×
668
                }
×
669

670
                // the node was annotated we need to wait for the operator to finish the drain
671
                return true, nil
×
672
        }
673
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for node")
×
674
        err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainRequired, dn.client)
×
675
        if err != nil {
×
676
                log.Log.Error(err, "handleDrain(): Failed to annotate node")
×
677
                return false, err
×
678
        }
×
679

680
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for nodeState")
×
681
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
682
                consts.NodeStateDrainAnnotation,
×
683
                consts.DrainRequired, dn.client); err != nil {
×
684
                return false, err
×
685
        }
×
686

687
        // the node was annotated we need to wait for the operator to finish the drain
688
        return true, nil
×
689
}
690

691
func (dn *Daemon) restartDevicePluginPod() error {
1✔
692
        dn.mu.Lock()
1✔
693
        defer dn.mu.Unlock()
1✔
694
        log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod")
1✔
695

1✔
696
        var podToDelete string
1✔
697
        pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{
1✔
698
                LabelSelector:   "app=sriov-device-plugin",
1✔
699
                FieldSelector:   "spec.nodeName=" + vars.NodeName,
1✔
700
                ResourceVersion: "0",
1✔
701
        })
1✔
702
        if err != nil {
1✔
703
                if errors.IsNotFound(err) {
×
704
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
705
                        return nil
×
706
                }
×
707
                log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying")
×
708
                return err
×
709
        }
710

711
        if len(pods.Items) == 0 {
1✔
712
                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
713
                return nil
×
714
        }
×
715
        podToDelete = pods.Items[0].Name
1✔
716

1✔
717
        log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete)
1✔
718
        err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{})
1✔
719
        if errors.IsNotFound(err) {
1✔
720
                log.Log.Info("restartDevicePluginPod(): pod to delete not found")
×
721
                return nil
×
722
        }
×
723
        if err != nil {
1✔
724
                log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying")
×
725
                return err
×
726
        }
×
727

728
        if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) {
2✔
729
                _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{})
1✔
730
                if errors.IsNotFound(err) {
2✔
731
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
1✔
732
                        return true, nil
1✔
733
                }
1✔
734

735
                if err != nil {
×
736
                        log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying")
×
737
                } else {
×
738
                        log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete)
×
739
                }
×
740
                return false, nil
×
741
        }, dn.stopCh); err != nil {
×
742
                log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion")
×
743
                return err
×
744
        }
×
745

746
        return nil
1✔
747
}
748

749
func (dn *Daemon) rebootNode() {
×
750
        log.Log.Info("rebootNode(): trigger node reboot")
×
751
        exit, err := dn.HostHelpers.Chroot(consts.Host)
×
752
        if err != nil {
×
753
                log.Log.Error(err, "rebootNode(): chroot command failed")
×
754
        }
×
755
        defer exit()
×
756
        // creates a new transient systemd unit to reboot the system.
×
757
        // We explictily try to stop kubelet.service first, before anything else; this
×
758
        // way we ensure the rest of system stays running, because kubelet may need
×
759
        // to do "graceful" shutdown by e.g. de-registering with a load balancer.
×
760
        // However note we use `;` instead of `&&` so we keep rebooting even
×
761
        // if kubelet failed to shutdown - that way the machine will still eventually reboot
×
762
        // as systemd will time out the stop invocation.
×
763
        cmd := exec.Command("systemd-run", "--unit", "sriov-network-config-daemon-reboot",
×
764
                "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot")
×
765

×
766
        if err := cmd.Run(); err != nil {
×
767
                log.Log.Error(err, "failed to reboot node")
×
768
        }
×
769
}
770

771
func (dn *Daemon) prepareNMUdevRule() error {
1✔
772
        // we need to remove the Red Hat Virtio network device from the udev rule configuration
1✔
773
        // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot
1✔
774
        // even that the operator should not be installed on virtual environments that are not openstack
1✔
775
        // we should not destroy the cluster if the operator is installed there
1✔
776
        supportedVfIds := []string{}
1✔
777
        for _, vfID := range sriovnetworkv1.GetSupportedVfIds() {
2✔
778
                if vfID == "0x1000" || vfID == "0x1041" {
1✔
779
                        continue
×
780
                }
781
                supportedVfIds = append(supportedVfIds, vfID)
1✔
782
        }
783

784
        return dn.HostHelpers.PrepareNMUdevRule(supportedVfIds)
1✔
785
}
786

787
// isDrainCompleted returns true if the current-state annotation is drain completed
788
func (dn *Daemon) isDrainCompleted() bool {
1✔
789
        return utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete)
1✔
790
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc