• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 11723997336

07 Nov 2024 01:29PM UTC coverage: 45.623% (+0.2%) from 45.458%
11723997336

Pull #799

github

web-flow
Merge 02c6b009c into 2b02ba1fe
Pull Request #799: Rdma subsytem mode

158 of 293 new or added lines in 12 files covered. (53.92%)

3 existing lines in 2 files now uncovered.

6880 of 15080 relevant lines covered (45.62%)

0.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.58
/pkg/daemon/daemon.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "math/rand"
7
        "reflect"
8
        "sync"
9
        "time"
10

11
        "golang.org/x/time/rate"
12
        "k8s.io/apimachinery/pkg/api/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
15
        "k8s.io/apimachinery/pkg/util/wait"
16
        "k8s.io/client-go/kubernetes"
17
        "k8s.io/client-go/tools/cache"
18
        "k8s.io/client-go/util/workqueue"
19
        "sigs.k8s.io/controller-runtime/pkg/client"
20
        "sigs.k8s.io/controller-runtime/pkg/log"
21

22
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
23
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
24
        sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
25
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
26
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate"
27
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
28
        snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
29
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
30
        plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins"
31
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd"
32
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
33
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
34
)
35

36
const (
37
        // updateDelay is the baseline speed at which we react to changes.  We don't
38
        // need to react in milliseconds as any change would involve rebooting the node.
39
        updateDelay = 5 * time.Second
40
        // maxUpdateBackoff is the maximum time to react to a change as we back off
41
        // in the face of errors.
42
        maxUpdateBackoff = 60 * time.Second
43
)
44

45
type Message struct {
46
        syncStatus    string
47
        lastSyncError string
48
}
49

50
type Daemon struct {
51
        client client.Client
52

53
        sriovClient snclientset.Interface
54
        // kubeClient allows interaction with Kubernetes, including the node we are running on.
55
        kubeClient kubernetes.Interface
56

57
        desiredNodeState *sriovnetworkv1.SriovNetworkNodeState
58
        currentNodeState *sriovnetworkv1.SriovNetworkNodeState
59

60
        // list of disabled plugins
61
        disabledPlugins []string
62

63
        loadedPlugins map[string]plugin.VendorPlugin
64

65
        HostHelpers helper.HostHelpersInterface
66

67
        platformHelpers platforms.Interface
68

69
        // channel used by callbacks to signal Run() of an error
70
        exitCh chan<- error
71

72
        // channel used to ensure all spawned goroutines exit when we exit.
73
        stopCh <-chan struct{}
74

75
        syncCh <-chan struct{}
76

77
        refreshCh chan<- Message
78

79
        mu *sync.Mutex
80

81
        disableDrain bool
82

83
        workqueue workqueue.RateLimitingInterface
84

85
        eventRecorder *EventRecorder
86

87
        featureGate featuregate.FeatureGate
88
}
89

90
func New(
91
        client client.Client,
92
        sriovClient snclientset.Interface,
93
        kubeClient kubernetes.Interface,
94
        hostHelpers helper.HostHelpersInterface,
95
        platformHelper platforms.Interface,
96
        exitCh chan<- error,
97
        stopCh <-chan struct{},
98
        syncCh <-chan struct{},
99
        refreshCh chan<- Message,
100
        er *EventRecorder,
101
        featureGates featuregate.FeatureGate,
102
        disabledPlugins []string,
103
) *Daemon {
1✔
104
        return &Daemon{
1✔
105
                client:           client,
1✔
106
                sriovClient:      sriovClient,
1✔
107
                kubeClient:       kubeClient,
1✔
108
                HostHelpers:      hostHelpers,
1✔
109
                platformHelpers:  platformHelper,
1✔
110
                exitCh:           exitCh,
1✔
111
                stopCh:           stopCh,
1✔
112
                syncCh:           syncCh,
1✔
113
                refreshCh:        refreshCh,
1✔
114
                desiredNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
115
                currentNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
116
                workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
1✔
117
                        &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
1✔
118
                        workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
1✔
119
                eventRecorder:   er,
1✔
120
                featureGate:     featureGates,
1✔
121
                disabledPlugins: disabledPlugins,
1✔
122
                mu:              &sync.Mutex{},
1✔
123
        }
1✔
124
}
1✔
125

126
// Run the config daemon
127
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
1✔
128
        log.Log.V(0).Info("Run()", "node", vars.NodeName)
1✔
129

1✔
130
        if vars.ClusterType == consts.ClusterTypeOpenshift {
1✔
131
                log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor())
×
132
        } else {
1✔
133
                log.Log.V(0).Info("Run(): start daemon.")
1✔
134
        }
1✔
135

136
        if !vars.UsingSystemdMode {
2✔
137
                log.Log.V(0).Info("Run(): daemon running in daemon mode")
1✔
138
                dn.HostHelpers.CheckRDMAEnabled()
1✔
139
                dn.HostHelpers.TryEnableTun()
1✔
140
                dn.HostHelpers.TryEnableVhostNet()
1✔
141
                err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift)
1✔
142
                if err != nil {
1✔
143
                        log.Log.Error(err, "failed to remove all the systemd sriov files")
×
144
                }
×
145
        } else {
×
146
                log.Log.V(0).Info("Run(): daemon running in systemd mode")
×
147
        }
×
148

149
        // Only watch own SriovNetworkNodeState CR
150
        defer utilruntime.HandleCrash()
1✔
151
        defer dn.workqueue.ShutDown()
1✔
152

1✔
153
        if err := dn.prepareNMUdevRule(); err != nil {
1✔
154
                log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs")
×
155
        }
×
156
        if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil {
1✔
157
                log.Log.Error(err, "failed to prepare udev files to rename VF representors for requested VFs")
×
158
        }
×
159

160
        var timeout int64 = 5
1✔
161
        var metadataKey = "metadata.name"
1✔
162
        informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
163
                time.Second*15,
1✔
164
                vars.Namespace,
1✔
165
                func(lo *metav1.ListOptions) {
2✔
166
                        lo.FieldSelector = metadataKey + "=" + vars.NodeName
1✔
167
                        lo.TimeoutSeconds = &timeout
1✔
168
                },
1✔
169
        )
170

171
        informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer()
1✔
172
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
173
                AddFunc: dn.enqueueNodeState,
1✔
174
                UpdateFunc: func(old, new interface{}) {
1✔
175
                        dn.enqueueNodeState(new)
×
176
                },
×
177
        })
178

179
        cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
180
                time.Second*30,
1✔
181
                vars.Namespace,
1✔
182
                func(lo *metav1.ListOptions) {
2✔
183
                        lo.FieldSelector = metadataKey + "=" + "default"
1✔
184
                },
1✔
185
        )
186

187
        cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer()
1✔
188
        cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
189
                AddFunc:    dn.operatorConfigAddHandler,
1✔
190
                UpdateFunc: dn.operatorConfigChangeHandler,
1✔
191
        })
1✔
192

1✔
193
        rand.Seed(time.Now().UnixNano())
1✔
194
        go cfgInformer.Run(dn.stopCh)
1✔
195
        time.Sleep(5 * time.Second)
1✔
196
        go informer.Run(dn.stopCh)
1✔
197
        if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, informer.HasSynced); !ok {
1✔
198
                return fmt.Errorf("failed to wait for caches to sync")
×
199
        }
×
200

201
        log.Log.Info("Starting workers")
1✔
202
        // Launch one worker to process
1✔
203
        go wait.Until(dn.runWorker, time.Second, stopCh)
1✔
204
        log.Log.Info("Started workers")
1✔
205

1✔
206
        for {
2✔
207
                select {
1✔
208
                case <-stopCh:
1✔
209
                        log.Log.V(0).Info("Run(): stop daemon")
1✔
210
                        return nil
1✔
211
                case err, more := <-exitCh:
×
212
                        log.Log.Error(err, "got an error")
×
213
                        if more {
×
214
                                dn.refreshCh <- Message{
×
215
                                        syncStatus:    consts.SyncStatusFailed,
×
216
                                        lastSyncError: err.Error(),
×
217
                                }
×
218
                        }
×
219
                        return err
×
220
                }
221
        }
222
}
223

224
func (dn *Daemon) runWorker() {
1✔
225
        for dn.processNextWorkItem() {
2✔
226
        }
1✔
227
}
228

229
func (dn *Daemon) enqueueNodeState(obj interface{}) {
1✔
230
        var ns *sriovnetworkv1.SriovNetworkNodeState
1✔
231
        var ok bool
1✔
232
        if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok {
1✔
233
                utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj))
×
234
                return
×
235
        }
×
236
        key := ns.GetGeneration()
1✔
237
        dn.workqueue.Add(key)
1✔
238
}
239

240
func (dn *Daemon) processNextWorkItem() bool {
1✔
241
        log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len())
1✔
242
        obj, shutdown := dn.workqueue.Get()
1✔
243
        if shutdown {
2✔
244
                return false
1✔
245
        }
1✔
246

247
        log.Log.V(2).Info("get item from queue", "item", obj.(int64))
1✔
248

1✔
249
        // We wrap this block in a func so we can defer c.workqueue.Done.
1✔
250
        err := func(obj interface{}) error {
2✔
251
                // We call Done here so the workqueue knows we have finished
1✔
252
                // processing this item.
1✔
253
                defer dn.workqueue.Done(obj)
1✔
254
                var key int64
1✔
255
                var ok bool
1✔
256
                if key, ok = obj.(int64); !ok {
1✔
257
                        // As the item in the workqueue is actually invalid, we call
×
258
                        // Forget here.
×
259
                        dn.workqueue.Forget(obj)
×
260
                        utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj))
×
261
                        return nil
×
262
                }
×
263

264
                err := dn.nodeStateSyncHandler()
1✔
265
                if err != nil {
1✔
266
                        // Ereport error message, and put the item back to work queue for retry.
×
267
                        dn.refreshCh <- Message{
×
268
                                syncStatus:    consts.SyncStatusFailed,
×
269
                                lastSyncError: err.Error(),
×
270
                        }
×
271
                        <-dn.syncCh
×
272
                        dn.workqueue.AddRateLimited(key)
×
273
                        return fmt.Errorf("error syncing: %s, requeuing", err.Error())
×
274
                }
×
275
                // Finally, if no error occurs we Forget this item so it does not
276
                // get queued again until another change happens.
277
                dn.workqueue.Forget(obj)
1✔
278
                log.Log.Info("Successfully synced")
1✔
279
                return nil
1✔
280
        }(obj)
281

282
        if err != nil {
1✔
283
                utilruntime.HandleError(err)
×
284
        }
×
285

286
        return true
1✔
287
}
288

289
func (dn *Daemon) operatorConfigAddHandler(obj interface{}) {
×
290
        dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj)
×
291
}
×
292

293
func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) {
×
294
        oldCfg := old.(*sriovnetworkv1.SriovOperatorConfig)
×
295
        newCfg := new.(*sriovnetworkv1.SriovOperatorConfig)
×
296
        if newCfg.Namespace != vars.Namespace || newCfg.Name != consts.DefaultConfigName {
×
297
                log.Log.V(2).Info("unsupported SriovOperatorConfig", "namespace", newCfg.Namespace, "name", newCfg.Name)
×
298
                return
×
299
        }
×
300

301
        snolog.SetLogLevel(newCfg.Spec.LogLevel)
×
302

×
303
        newDisableDrain := newCfg.Spec.DisableDrain
×
304
        if dn.disableDrain != newDisableDrain {
×
305
                dn.disableDrain = newDisableDrain
×
306
                log.Log.Info("Set Disable Drain", "value", dn.disableDrain)
×
307
        }
×
308

309
        if !reflect.DeepEqual(oldCfg.Spec.FeatureGates, newCfg.Spec.FeatureGates) {
×
310
                dn.featureGate.Init(newCfg.Spec.FeatureGates)
×
311
                log.Log.Info("Updated featureGates", "featureGates", dn.featureGate.String())
×
312
        }
×
313

314
        vars.MlxPluginFwReset = dn.featureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate)
×
315
}
316

317
func (dn *Daemon) nodeStateSyncHandler() error {
1✔
318
        var err error
1✔
319
        // Get the latest NodeState
1✔
320
        var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""}
1✔
321
        dn.desiredNodeState, err = dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
322
        if err != nil {
1✔
323
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
324
                return err
×
325
        }
×
326
        latest := dn.desiredNodeState.GetGeneration()
1✔
327
        log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest)
1✔
328

1✔
329
        // load plugins if it has not loaded
1✔
330
        if len(dn.loadedPlugins) == 0 {
1✔
331
                dn.loadedPlugins, err = loadPlugins(dn.desiredNodeState, dn.HostHelpers, dn.disabledPlugins)
×
332
                if err != nil {
×
333
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins")
×
334
                        return err
×
335
                }
×
336
        }
337

338
        skipReconciliation := true
1✔
339
        // if the operator complete the drain operator we should continue the configuration
1✔
340
        if !dn.isDrainCompleted() {
2✔
341
                if vars.UsingSystemdMode && dn.currentNodeState.GetGeneration() == latest {
1✔
342
                        serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath)
×
343
                        if err != nil {
×
344
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host")
×
345
                                return err
×
346
                        }
×
347
                        postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath)
×
348
                        if err != nil {
×
349
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host")
×
350
                                return err
×
351
                        }
×
352

353
                        // if the service doesn't exist we should continue to let the k8s plugin to create the service files
354
                        // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply
355
                        // the system service and reboot the node the config-daemon doesn't need to do anything.
356
                        if !(serviceEnabled && postNetworkServiceEnabled) {
×
357
                                sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed,
×
358
                                        LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+
×
359
                                                "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)}
×
360
                        } else {
×
361
                                sriovResult, err = systemd.ReadSriovResult()
×
362
                                if err != nil {
×
363
                                        log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host")
×
364
                                        return err
×
365
                                }
×
366
                        }
367
                        if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed {
×
368
                                log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError)
×
369

×
370
                                // add the error but don't requeue
×
371
                                dn.refreshCh <- Message{
×
372
                                        syncStatus:    consts.SyncStatusFailed,
×
373
                                        lastSyncError: sriovResult.LastSyncError,
×
374
                                }
×
375
                                <-dn.syncCh
×
376
                                return nil
×
377
                        }
×
378
                }
379

380
                skipReconciliation, err = dn.shouldSkipReconciliation(dn.desiredNodeState)
1✔
381
                if err != nil {
1✔
382
                        return err
×
383
                }
×
384
        }
385

386
        // we are done with the configuration just return here
387
        if dn.currentNodeState.GetGeneration() == dn.desiredNodeState.GetGeneration() &&
1✔
388
                dn.desiredNodeState.Status.SyncStatus == consts.SyncStatusSucceeded && skipReconciliation {
1✔
389
                log.Log.Info("Current state and desire state are equal together with sync status succeeded nothing to do")
×
390
                return nil
×
391
        }
×
392

393
        dn.refreshCh <- Message{
1✔
394
                syncStatus:    consts.SyncStatusInProgress,
1✔
395
                lastSyncError: "",
1✔
396
        }
1✔
397
        // wait for writer to refresh status then pull again the latest node state
1✔
398
        <-dn.syncCh
1✔
399

1✔
400
        // we need to load the latest status to our object
1✔
401
        // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't
1✔
402
        // trigger the refresh
1✔
403
        updatedState, err := dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
404
        if err != nil {
1✔
405
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
406
                return err
×
407
        }
×
408
        dn.desiredNodeState.Status = updatedState.Status
1✔
409

1✔
410
        reqReboot := false
1✔
411
        reqDrain := false
1✔
412

1✔
413
        // check if any of the plugins required to drain or reboot the node
1✔
414
        for k, p := range dn.loadedPlugins {
2✔
415
                d, r := false, false
1✔
416
                if dn.currentNodeState.GetName() == "" {
2✔
417
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state")
1✔
418
                } else {
1✔
419
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state")
×
420
                }
×
421
                d, r, err = p.OnNodeStateChange(dn.desiredNodeState)
1✔
422
                if err != nil {
1✔
423
                        log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k)
×
424
                        return err
×
425
                }
×
426
                log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r)
1✔
427
                reqDrain = reqDrain || d
1✔
428
                reqReboot = reqReboot || r
1✔
429
        }
430

431
        // When running using systemd check if the applied configuration is the latest one
432
        // or there is a new config we need to apply
433
        // When using systemd configuration we write the file
434
        if vars.UsingSystemdMode {
1✔
435
                log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host")
×
436
                systemdConfModified, err := systemd.WriteConfFile(dn.desiredNodeState)
×
437
                if err != nil {
×
438
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode")
×
439
                        return err
×
440
                }
×
441
                if systemdConfModified {
×
442
                        // remove existing result file to make sure that we will not use outdated result, e.g. in case if
×
443
                        // systemd service was not triggered for some reason
×
444
                        err = systemd.RemoveSriovResult()
×
445
                        if err != nil {
×
446
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode")
×
447
                                return err
×
448
                        }
×
449
                }
450
                reqDrain = reqDrain || systemdConfModified
×
451
                // require reboot if drain needed for systemd mode
×
452
                reqReboot = reqReboot || systemdConfModified || reqDrain
×
453
                log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results",
×
454
                        "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
×
455

×
456
                err = systemd.WriteSriovSupportedNics()
×
457
                if err != nil {
×
458
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode")
×
459
                        return err
×
460
                }
×
461
        }
462

463
        log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon",
1✔
464
                "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
1✔
465

1✔
466
        // handle drain only if the plugin request drain, or we are already in a draining request state
1✔
467
        if reqDrain || !utils.ObjectHasAnnotation(dn.desiredNodeState,
1✔
468
                consts.NodeStateDrainAnnotationCurrent,
1✔
469
                consts.DrainIdle) {
1✔
470
                drainInProcess, err := dn.handleDrain(reqReboot)
×
471
                if err != nil {
×
472
                        log.Log.Error(err, "failed to handle drain")
×
473
                        return err
×
474
                }
×
475
                if drainInProcess {
×
476
                        return nil
×
477
                }
×
478
        }
479

480
        // apply the vendor plugins after we are done with drain if needed
481
        for k, p := range dn.loadedPlugins {
2✔
482
                // Skip both the general and virtual plugin apply them last
1✔
483
                if k != GenericPluginName && k != VirtualPluginName {
1✔
484
                        err := p.Apply()
×
485
                        if err != nil {
×
486
                                log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k)
×
487
                                return err
×
488
                        }
×
489
                }
490
        }
491

492
        // if we don't need to reboot, or we are not doing the configuration in systemd
493
        // we apply the generic plugin
494
        if !reqReboot && !vars.UsingSystemdMode {
2✔
495
                // For BareMetal machines apply the generic plugin
1✔
496
                selectedPlugin, ok := dn.loadedPlugins[GenericPluginName]
1✔
497
                if ok {
2✔
498
                        // Apply generic plugin last
1✔
499
                        err = selectedPlugin.Apply()
1✔
500
                        if err != nil {
1✔
501
                                log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply")
×
502
                                return err
×
503
                        }
×
504
                }
505

506
                // For Virtual machines apply the virtual plugin
507
                selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName]
1✔
508
                if ok {
1✔
509
                        // Apply virtual plugin last
×
510
                        err = selectedPlugin.Apply()
×
511
                        if err != nil {
×
512
                                log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply")
×
513
                                return err
×
514
                        }
×
515
                }
516
        }
517

518
        if reqReboot {
1✔
519
                log.Log.Info("nodeStateSyncHandler(): reboot node")
×
520
                dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
×
521
                dn.rebootNode()
×
522
                return nil
×
523
        }
×
524

525
        // restart device plugin pod
526
        log.Log.Info("nodeStateSyncHandler(): restart device plugin pod")
1✔
527
        if err := dn.restartDevicePluginPod(); err != nil {
1✔
528
                log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod")
×
529
                return err
×
530
        }
×
531

532
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node")
1✔
533
        err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client)
1✔
534
        if err != nil {
1✔
535
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to annotate node")
×
536
                return err
×
537
        }
×
538

539
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for nodeState")
1✔
540
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
1✔
541
                consts.NodeStateDrainAnnotation,
1✔
542
                consts.DrainIdle, dn.client); err != nil {
1✔
543
                return err
×
544
        }
×
545

546
        log.Log.Info("nodeStateSyncHandler(): sync succeeded")
1✔
547
        dn.currentNodeState = dn.desiredNodeState.DeepCopy()
1✔
548
        if vars.UsingSystemdMode {
1✔
549
                dn.refreshCh <- Message{
×
550
                        syncStatus:    sriovResult.SyncStatus,
×
551
                        lastSyncError: sriovResult.LastSyncError,
×
552
                }
×
553
        } else {
1✔
554
                dn.refreshCh <- Message{
1✔
555
                        syncStatus:    consts.SyncStatusSucceeded,
1✔
556
                        lastSyncError: "",
1✔
557
                }
1✔
558
        }
1✔
559
        // wait for writer to refresh the status
560
        <-dn.syncCh
1✔
561
        return nil
1✔
562
}
563

564
func (dn *Daemon) shouldSkipReconciliation(latestState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) {
1✔
565
        log.Log.V(0).Info("shouldSkipReconciliation()")
1✔
566
        var err error
1✔
567

1✔
568
        // Skip when SriovNetworkNodeState object has just been created.
1✔
569
        if latestState.GetGeneration() == 1 && len(latestState.Spec.Interfaces) == 0 {
1✔
570
                err = dn.HostHelpers.ClearPCIAddressFolder()
×
571
                if err != nil {
×
572
                        log.Log.Error(err, "failed to clear the PCI address configuration")
×
573
                        return false, err
×
574
                }
×
575

576
                log.Log.V(0).Info(
×
577
                        "shouldSkipReconciliation(): interface policy spec not yet set by controller for sriovNetworkNodeState",
×
578
                        "name", latestState.Name)
×
579
                if latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
580
                        dn.refreshCh <- Message{
×
581
                                syncStatus:    consts.SyncStatusSucceeded,
×
582
                                lastSyncError: "",
×
583
                        }
×
584
                        // wait for writer to refresh status
×
585
                        <-dn.syncCh
×
586
                }
×
587
                return true, nil
×
588
        }
589

590
        // Verify changes in the status of the SriovNetworkNodeState CR.
591
        if dn.currentNodeState.GetGeneration() == latestState.GetGeneration() {
1✔
592
                log.Log.V(0).Info("shouldSkipReconciliation() verifying status change")
×
593
                for _, p := range dn.loadedPlugins {
×
594
                        // Verify changes in the status of the SriovNetworkNodeState CR.
×
595
                        log.Log.V(0).Info("shouldSkipReconciliation(): verifying status change for plugin", "pluginName", p.Name())
×
596
                        changed, err := p.CheckStatusChanges(latestState)
×
597
                        if err != nil {
×
598
                                return false, err
×
599
                        }
×
600
                        if changed {
×
601
                                log.Log.V(0).Info("shouldSkipReconciliation(): plugin require change", "pluginName", p.Name())
×
602
                                return false, nil
×
603
                        }
×
604
                }
605

606
                log.Log.V(0).Info("shouldSkipReconciliation(): Interface not changed")
×
607
                if latestState.Status.LastSyncError != "" ||
×
608
                        latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
609
                        dn.refreshCh <- Message{
×
610
                                syncStatus:    consts.SyncStatusSucceeded,
×
611
                                lastSyncError: "",
×
612
                        }
×
613
                        // wait for writer to refresh the status
×
614
                        <-dn.syncCh
×
615
                }
×
616

617
                return true, nil
×
618
        }
619

620
        return false, nil
1✔
621
}
622

623
// handleDrain: adds the right annotation to the node and nodeState object
624
// returns true if we need to finish the reconcile loop and wait for a new object
625
func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) {
×
626
        // done with the drain we can continue with the configuration
×
627
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) {
×
628
                log.Log.Info("handleDrain(): the node complete the draining")
×
629
                return false, nil
×
630
        }
×
631

632
        // the operator is still draining the node so we reconcile
633
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) {
×
634
                log.Log.Info("handleDrain(): the node is still draining")
×
635
                return true, nil
×
636
        }
×
637

638
        // drain is disabled we continue with the configuration
639
        if dn.disableDrain {
×
640
                log.Log.Info("handleDrain(): drain is disabled in sriovOperatorConfig")
×
641
                return false, nil
×
642
        }
×
643

644
        if reqReboot {
×
645
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for node")
×
646
                err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.RebootRequired, dn.client)
×
647
                if err != nil {
×
648
                        log.Log.Error(err, "applyDrainRequired(): Failed to annotate node")
×
649
                        return false, err
×
650
                }
×
651

652
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for nodeState")
×
653
                if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
654
                        consts.NodeStateDrainAnnotation,
×
655
                        consts.RebootRequired, dn.client); err != nil {
×
656
                        return false, err
×
657
                }
×
658

659
                // the node was annotated we need to wait for the operator to finish the drain
660
                return true, nil
×
661
        }
662
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for node")
×
663
        err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainRequired, dn.client)
×
664
        if err != nil {
×
665
                log.Log.Error(err, "handleDrain(): Failed to annotate node")
×
666
                return false, err
×
667
        }
×
668

669
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for nodeState")
×
670
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
671
                consts.NodeStateDrainAnnotation,
×
672
                consts.DrainRequired, dn.client); err != nil {
×
673
                return false, err
×
674
        }
×
675

676
        // the node was annotated we need to wait for the operator to finish the drain
677
        return true, nil
×
678
}
679

680
func (dn *Daemon) restartDevicePluginPod() error {
1✔
681
        dn.mu.Lock()
1✔
682
        defer dn.mu.Unlock()
1✔
683
        log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod")
1✔
684

1✔
685
        pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{
1✔
686
                LabelSelector:   "app=sriov-device-plugin",
1✔
687
                FieldSelector:   "spec.nodeName=" + vars.NodeName,
1✔
688
                ResourceVersion: "0",
1✔
689
        })
1✔
690
        if err != nil {
1✔
691
                if errors.IsNotFound(err) {
×
692
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
693
                        return nil
×
694
                }
×
695
                log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying")
×
696
                return err
×
697
        }
698

699
        if len(pods.Items) == 0 {
1✔
700
                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
701
                return nil
×
702
        }
×
703

704
        for _, pod := range pods.Items {
2✔
705
                podToDelete := pod.Name
1✔
706
                log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete)
1✔
707
                err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{})
1✔
708
                if errors.IsNotFound(err) {
1✔
709
                        log.Log.Info("restartDevicePluginPod(): pod to delete not found")
×
710
                        continue
×
711
                }
712
                if err != nil {
1✔
713
                        log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying")
×
714
                        return err
×
715
                }
×
716

717
                if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) {
2✔
718
                        _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{})
1✔
719
                        if errors.IsNotFound(err) {
2✔
720
                                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
1✔
721
                                return true, nil
1✔
722
                        }
1✔
723

724
                        if err != nil {
×
725
                                log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying")
×
726
                        } else {
×
727
                                log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete)
×
728
                        }
×
729
                        return false, nil
×
730
                }, dn.stopCh); err != nil {
×
731
                        log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion")
×
732
                        return err
×
733
                }
×
734
        }
735

736
        return nil
1✔
737
}
738

739
func (dn *Daemon) rebootNode() {
×
740
        log.Log.Info("rebootNode(): trigger node reboot")
×
741
        exit, err := dn.HostHelpers.Chroot(consts.Host)
×
742
        if err != nil {
×
743
                log.Log.Error(err, "rebootNode(): chroot command failed")
×
744
        }
×
745
        defer exit()
×
746
        // creates a new transient systemd unit to reboot the system.
×
747
        // We explictily try to stop kubelet.service first, before anything else; this
×
748
        // way we ensure the rest of system stays running, because kubelet may need
×
749
        // to do "graceful" shutdown by e.g. de-registering with a load balancer.
×
750
        // However note we use `;` instead of `&&` so we keep rebooting even
×
751
        // if kubelet failed to shutdown - that way the machine will still eventually reboot
×
752
        // as systemd will time out the stop invocation.
×
NEW
753
        stdOut, StdErr, err := dn.HostHelpers.RunCommand("systemd-run", "--unit", "sriov-network-config-daemon-reboot",
×
754
                "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot")
×
755

×
NEW
756
        if err != nil {
×
NEW
757
                log.Log.Error(err, "failed to reboot node", "stdOut", stdOut, "StdErr", StdErr)
×
UNCOV
758
        }
×
759
}
760

761
func (dn *Daemon) prepareNMUdevRule() error {
1✔
762
        // we need to remove the Red Hat Virtio network device from the udev rule configuration
1✔
763
        // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot
1✔
764
        // even that the operator should not be installed on virtual environments that are not openstack
1✔
765
        // we should not destroy the cluster if the operator is installed there
1✔
766
        supportedVfIds := []string{}
1✔
767
        for _, vfID := range sriovnetworkv1.GetSupportedVfIds() {
2✔
768
                if vfID == "0x1000" || vfID == "0x1041" {
1✔
769
                        continue
×
770
                }
771
                supportedVfIds = append(supportedVfIds, vfID)
1✔
772
        }
773

774
        return dn.HostHelpers.PrepareNMUdevRule(supportedVfIds)
1✔
775
}
776

777
// isDrainCompleted returns true if the current-state annotation is drain completed
778
func (dn *Daemon) isDrainCompleted() bool {
1✔
779
        return utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete)
1✔
780
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc