• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 8467233545

28 Mar 2024 12:26PM UTC coverage: 38.344% (+0.7%) from 37.675%
8467233545

push

github

web-flow
Merge pull request #643 from ykulazhenkov/pr-turn-on-switchdev

[switchdev 9/9] Enable new switchdev implementation

189 of 289 new or added lines in 9 files covered. (65.4%)

26 existing lines in 8 files now uncovered.

4798 of 12513 relevant lines covered (38.34%)

0.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.37
/pkg/daemon/daemon.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "math/rand"
7
        "os/exec"
8
        "sync"
9
        "time"
10

11
        "golang.org/x/time/rate"
12
        "k8s.io/apimachinery/pkg/api/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
15
        "k8s.io/apimachinery/pkg/util/wait"
16
        "k8s.io/client-go/kubernetes"
17
        "k8s.io/client-go/tools/cache"
18
        "k8s.io/client-go/util/workqueue"
19
        "sigs.k8s.io/controller-runtime/pkg/client"
20
        "sigs.k8s.io/controller-runtime/pkg/log"
21

22
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
23
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
24
        sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
25
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
26
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
27
        snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
28
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
29
        plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins"
30
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd"
31
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
32
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
33
)
34

35
const (
36
        // updateDelay is the baseline speed at which we react to changes.  We don't
37
        // need to react in milliseconds as any change would involve rebooting the node.
38
        updateDelay = 5 * time.Second
39
        // maxUpdateBackoff is the maximum time to react to a change as we back off
40
        // in the face of errors.
41
        maxUpdateBackoff = 60 * time.Second
42
)
43

44
type Message struct {
45
        syncStatus    string
46
        lastSyncError string
47
}
48

49
type Daemon struct {
50
        client client.Client
51

52
        sriovClient snclientset.Interface
53
        // kubeClient allows interaction with Kubernetes, including the node we are running on.
54
        kubeClient kubernetes.Interface
55

56
        desiredNodeState *sriovnetworkv1.SriovNetworkNodeState
57
        currentNodeState *sriovnetworkv1.SriovNetworkNodeState
58

59
        // list of disabled plugins
60
        disabledPlugins []string
61

62
        loadedPlugins map[string]plugin.VendorPlugin
63

64
        HostHelpers helper.HostHelpersInterface
65

66
        platformHelpers platforms.Interface
67

68
        // channel used by callbacks to signal Run() of an error
69
        exitCh chan<- error
70

71
        // channel used to ensure all spawned goroutines exit when we exit.
72
        stopCh <-chan struct{}
73

74
        syncCh <-chan struct{}
75

76
        refreshCh chan<- Message
77

78
        mu *sync.Mutex
79

80
        disableDrain bool
81

82
        workqueue workqueue.RateLimitingInterface
83

84
        eventRecorder *EventRecorder
85
}
86

87
func New(
88
        client client.Client,
89
        sriovClient snclientset.Interface,
90
        kubeClient kubernetes.Interface,
91
        hostHelpers helper.HostHelpersInterface,
92
        platformHelper platforms.Interface,
93
        exitCh chan<- error,
94
        stopCh <-chan struct{},
95
        syncCh <-chan struct{},
96
        refreshCh chan<- Message,
97
        er *EventRecorder,
98
        disabledPlugins []string,
99
) *Daemon {
1✔
100
        return &Daemon{
1✔
101
                client:           client,
1✔
102
                sriovClient:      sriovClient,
1✔
103
                kubeClient:       kubeClient,
1✔
104
                HostHelpers:      hostHelpers,
1✔
105
                platformHelpers:  platformHelper,
1✔
106
                exitCh:           exitCh,
1✔
107
                stopCh:           stopCh,
1✔
108
                syncCh:           syncCh,
1✔
109
                refreshCh:        refreshCh,
1✔
110
                desiredNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
111
                currentNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
112
                workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
1✔
113
                        &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
1✔
114
                        workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
1✔
115
                eventRecorder:   er,
1✔
116
                disabledPlugins: disabledPlugins,
1✔
117
        }
1✔
118
}
1✔
119

120
// Run the config daemon
121
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
1✔
122
        log.Log.V(0).Info("Run()", "node", vars.NodeName)
1✔
123

1✔
124
        if vars.ClusterType == consts.ClusterTypeOpenshift {
1✔
125
                log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor())
×
126
        } else {
1✔
127
                log.Log.V(0).Info("Run(): start daemon.")
1✔
128
        }
1✔
129

130
        if !vars.UsingSystemdMode {
2✔
131
                log.Log.V(0).Info("Run(): daemon running in daemon mode")
1✔
132
                dn.HostHelpers.TryEnableRdma()
1✔
133
                dn.HostHelpers.TryEnableTun()
1✔
134
                dn.HostHelpers.TryEnableVhostNet()
1✔
135
                err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift)
1✔
136
                if err != nil {
1✔
137
                        log.Log.Error(err, "failed to remove all the systemd sriov files")
×
138
                }
×
139
        } else {
×
140
                log.Log.V(0).Info("Run(): daemon running in systemd mode")
×
141
        }
×
142

143
        // Only watch own SriovNetworkNodeState CR
144
        defer utilruntime.HandleCrash()
1✔
145
        defer dn.workqueue.ShutDown()
1✔
146

1✔
147
        if err := dn.prepareNMUdevRule(); err != nil {
1✔
148
                log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs")
×
149
        }
×
150
        if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil {
1✔
NEW
151
                log.Log.Error(err, "failed to prepare udev files to rename VF representors for requested VFs")
×
UNCOV
152
        }
×
153

154
        var timeout int64 = 5
1✔
155
        var metadataKey = "metadata.name"
1✔
156
        dn.mu = &sync.Mutex{}
1✔
157
        informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
158
                time.Second*15,
1✔
159
                vars.Namespace,
1✔
160
                func(lo *metav1.ListOptions) {
2✔
161
                        lo.FieldSelector = metadataKey + "=" + vars.NodeName
1✔
162
                        lo.TimeoutSeconds = &timeout
1✔
163
                },
1✔
164
        )
165

166
        informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer()
1✔
167
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
168
                AddFunc: dn.enqueueNodeState,
1✔
169
                UpdateFunc: func(old, new interface{}) {
1✔
170
                        dn.enqueueNodeState(new)
×
171
                },
×
172
        })
173

174
        cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
175
                time.Second*30,
1✔
176
                vars.Namespace,
1✔
177
                func(lo *metav1.ListOptions) {
2✔
178
                        lo.FieldSelector = metadataKey + "=" + "default"
1✔
179
                },
1✔
180
        )
181

182
        cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer()
1✔
183
        cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
184
                AddFunc:    dn.operatorConfigAddHandler,
1✔
185
                UpdateFunc: dn.operatorConfigChangeHandler,
1✔
186
        })
1✔
187

1✔
188
        rand.Seed(time.Now().UnixNano())
1✔
189
        go cfgInformer.Run(dn.stopCh)
1✔
190
        time.Sleep(5 * time.Second)
1✔
191
        go informer.Run(dn.stopCh)
1✔
192
        if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, informer.HasSynced); !ok {
1✔
193
                return fmt.Errorf("failed to wait for caches to sync")
×
194
        }
×
195

196
        log.Log.Info("Starting workers")
1✔
197
        // Launch one worker to process
1✔
198
        go wait.Until(dn.runWorker, time.Second, stopCh)
1✔
199
        log.Log.Info("Started workers")
1✔
200

1✔
201
        for {
2✔
202
                select {
1✔
203
                case <-stopCh:
1✔
204
                        log.Log.V(0).Info("Run(): stop daemon")
1✔
205
                        return nil
1✔
206
                case err, more := <-exitCh:
×
207
                        log.Log.Error(err, "got an error")
×
208
                        if more {
×
209
                                dn.refreshCh <- Message{
×
210
                                        syncStatus:    consts.SyncStatusFailed,
×
211
                                        lastSyncError: err.Error(),
×
212
                                }
×
213
                        }
×
214
                        return err
×
215
                }
216
        }
217
}
218

219
func (dn *Daemon) runWorker() {
1✔
220
        for dn.processNextWorkItem() {
2✔
221
        }
1✔
222
}
223

224
func (dn *Daemon) enqueueNodeState(obj interface{}) {
1✔
225
        var ns *sriovnetworkv1.SriovNetworkNodeState
1✔
226
        var ok bool
1✔
227
        if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok {
1✔
228
                utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj))
×
229
                return
×
230
        }
×
231
        key := ns.GetGeneration()
1✔
232
        dn.workqueue.Add(key)
1✔
233
}
234

235
func (dn *Daemon) processNextWorkItem() bool {
1✔
236
        log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len())
1✔
237
        obj, shutdown := dn.workqueue.Get()
1✔
238
        if shutdown {
2✔
239
                return false
1✔
240
        }
1✔
241

242
        log.Log.V(2).Info("get item from queue", "item", obj.(int64))
1✔
243

1✔
244
        // We wrap this block in a func so we can defer c.workqueue.Done.
1✔
245
        err := func(obj interface{}) error {
2✔
246
                // We call Done here so the workqueue knows we have finished
1✔
247
                // processing this item.
1✔
248
                defer dn.workqueue.Done(obj)
1✔
249
                var key int64
1✔
250
                var ok bool
1✔
251
                if key, ok = obj.(int64); !ok {
1✔
252
                        // As the item in the workqueue is actually invalid, we call
×
253
                        // Forget here.
×
254
                        dn.workqueue.Forget(obj)
×
255
                        utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj))
×
256
                        return nil
×
257
                }
×
258

259
                err := dn.nodeStateSyncHandler()
1✔
260
                if err != nil {
1✔
261
                        // Ereport error message, and put the item back to work queue for retry.
×
262
                        dn.refreshCh <- Message{
×
263
                                syncStatus:    consts.SyncStatusFailed,
×
264
                                lastSyncError: err.Error(),
×
265
                        }
×
266
                        <-dn.syncCh
×
267
                        dn.workqueue.AddRateLimited(key)
×
268
                        return fmt.Errorf("error syncing: %s, requeuing", err.Error())
×
269
                }
×
270
                // Finally, if no error occurs we Forget this item so it does not
271
                // get queued again until another change happens.
272
                dn.workqueue.Forget(obj)
1✔
273
                log.Log.Info("Successfully synced")
1✔
274
                return nil
1✔
275
        }(obj)
276

277
        if err != nil {
1✔
278
                utilruntime.HandleError(err)
×
279
        }
×
280

281
        return true
1✔
282
}
283

284
func (dn *Daemon) operatorConfigAddHandler(obj interface{}) {
×
285
        dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj)
×
286
}
×
287

288
func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) {
×
289
        newCfg := new.(*sriovnetworkv1.SriovOperatorConfig)
×
290
        if newCfg.Namespace != vars.Namespace || newCfg.Name != consts.DefaultConfigName {
×
291
                log.Log.V(2).Info("unsupported SriovOperatorConfig", "namespace", newCfg.Namespace, "name", newCfg.Name)
×
292
                return
×
293
        }
×
294

295
        snolog.SetLogLevel(newCfg.Spec.LogLevel)
×
296

×
297
        newDisableDrain := newCfg.Spec.DisableDrain
×
298
        if dn.disableDrain != newDisableDrain {
×
299
                dn.disableDrain = newDisableDrain
×
300
                log.Log.Info("Set Disable Drain", "value", dn.disableDrain)
×
301
        }
×
302
}
303

304
func (dn *Daemon) nodeStateSyncHandler() error {
1✔
305
        var err error
1✔
306
        // Get the latest NodeState
1✔
307
        var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""}
1✔
308
        dn.desiredNodeState, err = dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
309
        if err != nil {
1✔
310
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
311
                return err
×
312
        }
×
313
        latest := dn.desiredNodeState.GetGeneration()
1✔
314
        log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest)
1✔
315

1✔
316
        if dn.currentNodeState.GetGeneration() == latest && !dn.isDrainCompleted() {
1✔
317
                if vars.UsingSystemdMode {
×
318
                        serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath)
×
319
                        if err != nil {
×
320
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host")
×
321
                                return err
×
322
                        }
×
323
                        postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath)
×
324
                        if err != nil {
×
325
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host")
×
326
                                return err
×
327
                        }
×
328

329
                        // if the service doesn't exist we should continue to let the k8s plugin to create the service files
330
                        // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply
331
                        // the system service and reboot the node the config-daemon doesn't need to do anything.
332
                        if !(serviceEnabled && postNetworkServiceEnabled) {
×
333
                                sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed,
×
334
                                        LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+
×
335
                                                "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)}
×
336
                        } else {
×
337
                                sriovResult, err = systemd.ReadSriovResult()
×
338
                                if err != nil {
×
339
                                        log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host")
×
340
                                        return err
×
341
                                }
×
342
                        }
343
                        if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed {
×
344
                                log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError)
×
345

×
346
                                // add the error but don't requeue
×
347
                                dn.refreshCh <- Message{
×
348
                                        syncStatus:    consts.SyncStatusFailed,
×
349
                                        lastSyncError: sriovResult.LastSyncError,
×
350
                                }
×
351
                                <-dn.syncCh
×
352
                                return nil
×
353
                        }
×
354
                }
355
                log.Log.V(0).Info("nodeStateSyncHandler(): Interface not changed")
×
356
                if dn.desiredNodeState.Status.LastSyncError != "" ||
×
357
                        dn.desiredNodeState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
358
                        dn.refreshCh <- Message{
×
359
                                syncStatus:    consts.SyncStatusSucceeded,
×
360
                                lastSyncError: "",
×
361
                        }
×
362
                        // wait for writer to refresh the status
×
363
                        <-dn.syncCh
×
364
                }
×
365

366
                return nil
×
367
        }
368

369
        if dn.desiredNodeState.GetGeneration() == 1 && len(dn.desiredNodeState.Spec.Interfaces) == 0 {
1✔
370
                err = dn.HostHelpers.ClearPCIAddressFolder()
×
371
                if err != nil {
×
372
                        log.Log.Error(err, "failed to clear the PCI address configuration")
×
373
                        return err
×
374
                }
×
375

376
                log.Log.V(0).Info(
×
377
                        "nodeStateSyncHandler(): interface policy spec not yet set by controller for sriovNetworkNodeState",
×
378
                        "name", dn.desiredNodeState.Name)
×
379
                if dn.desiredNodeState.Status.SyncStatus != "Succeeded" {
×
380
                        dn.refreshCh <- Message{
×
381
                                syncStatus:    "Succeeded",
×
382
                                lastSyncError: "",
×
383
                        }
×
384
                        // wait for writer to refresh status
×
385
                        <-dn.syncCh
×
386
                }
×
387
                return nil
×
388
        }
389

390
        dn.refreshCh <- Message{
1✔
391
                syncStatus:    consts.SyncStatusInProgress,
1✔
392
                lastSyncError: "",
1✔
393
        }
1✔
394
        // wait for writer to refresh status then pull again the latest node state
1✔
395
        <-dn.syncCh
1✔
396

1✔
397
        // we need to load the latest status to our object
1✔
398
        // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't
1✔
399
        // trigger the refresh
1✔
400
        updatedState, err := dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
401
        if err != nil {
1✔
402
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
403
                return err
×
404
        }
×
405
        dn.desiredNodeState.Status = updatedState.Status
1✔
406

1✔
407
        // load plugins if it has not loaded
1✔
408
        if len(dn.loadedPlugins) == 0 {
1✔
409
                dn.loadedPlugins, err = loadPlugins(dn.desiredNodeState, dn.HostHelpers, dn.disabledPlugins)
×
410
                if err != nil {
×
411
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins")
×
412
                        return err
×
413
                }
×
414
        }
415

416
        reqReboot := false
1✔
417
        reqDrain := false
1✔
418

1✔
419
        // check if any of the plugins required to drain or reboot the node
1✔
420
        for k, p := range dn.loadedPlugins {
2✔
421
                d, r := false, false
1✔
422
                if dn.currentNodeState.GetName() == "" {
2✔
423
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state")
1✔
424
                } else {
1✔
425
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state")
×
426
                }
×
427
                d, r, err = p.OnNodeStateChange(dn.desiredNodeState)
1✔
428
                if err != nil {
1✔
429
                        log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k)
×
430
                        return err
×
431
                }
×
432
                log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r)
1✔
433
                reqDrain = reqDrain || d
1✔
434
                reqReboot = reqReboot || r
1✔
435
        }
436

437
        // When running using systemd check if the applied configuration is the latest one
438
        // or there is a new config we need to apply
439
        // When using systemd configuration we write the file
440
        if vars.UsingSystemdMode {
1✔
441
                log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host")
×
442
                systemdConfModified, err := systemd.WriteConfFile(dn.desiredNodeState)
×
443
                if err != nil {
×
444
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode")
×
445
                        return err
×
446
                }
×
447
                if systemdConfModified {
×
448
                        // remove existing result file to make sure that we will not use outdated result, e.g. in case if
×
449
                        // systemd service was not triggered for some reason
×
450
                        err = systemd.RemoveSriovResult()
×
451
                        if err != nil {
×
452
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode")
×
453
                                return err
×
454
                        }
×
455
                }
456
                reqDrain = reqDrain || systemdConfModified
×
457
                // require reboot if drain needed for systemd mode
×
458
                reqReboot = reqReboot || systemdConfModified || reqDrain
×
459
                log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results",
×
460
                        "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
×
461

×
462
                err = systemd.WriteSriovSupportedNics()
×
463
                if err != nil {
×
464
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode")
×
465
                        return err
×
466
                }
×
467
        }
468

469
        log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon",
1✔
470
                "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
1✔
471

1✔
472
        for k, p := range dn.loadedPlugins {
2✔
473
                // Skip both the general and virtual plugin apply them last
1✔
474
                if k != GenericPluginName && k != VirtualPluginName {
1✔
475
                        err := p.Apply()
×
476
                        if err != nil {
×
477
                                log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k)
×
478
                                return err
×
479
                        }
×
480
                }
481
        }
482

483
        // handle drain only if the plugin request drain, or we are already in a draining request state
484
        if reqDrain || !utils.ObjectHasAnnotation(dn.desiredNodeState,
1✔
485
                consts.NodeStateDrainAnnotationCurrent,
1✔
486
                consts.DrainIdle) {
2✔
487
                if err := dn.handleDrain(reqReboot); err != nil {
1✔
488
                        log.Log.Error(err, "failed to handle drain")
×
489
                        return err
×
490
                }
×
491
        }
492

493
        if !reqReboot && !vars.UsingSystemdMode {
2✔
494
                // For BareMetal machines apply the generic plugin
1✔
495
                selectedPlugin, ok := dn.loadedPlugins[GenericPluginName]
1✔
496
                if ok {
2✔
497
                        // Apply generic plugin last
1✔
498
                        err = selectedPlugin.Apply()
1✔
499
                        if err != nil {
1✔
500
                                log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply")
×
501
                                return err
×
502
                        }
×
503
                }
504

505
                // For Virtual machines apply the virtual plugin
506
                selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName]
1✔
507
                if ok {
1✔
508
                        // Apply virtual plugin last
×
509
                        err = selectedPlugin.Apply()
×
510
                        if err != nil {
×
511
                                log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply")
×
512
                                return err
×
513
                        }
×
514
                }
515
        }
516

517
        if reqReboot {
1✔
518
                log.Log.Info("nodeStateSyncHandler(): reboot node")
×
519
                dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
×
520
                dn.rebootNode()
×
521
                return nil
×
522
        }
×
523

524
        // restart device plugin pod
525
        log.Log.Info("nodeStateSyncHandler(): restart device plugin pod")
1✔
526
        if err := dn.restartDevicePluginPod(); err != nil {
1✔
527
                log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod")
×
528
                return err
×
529
        }
×
530

531
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node")
1✔
532
        err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client)
1✔
533
        if err != nil {
1✔
534
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to annotate node")
×
535
                return err
×
536
        }
×
537

538
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for nodeState")
1✔
539
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
1✔
540
                consts.NodeStateDrainAnnotation,
1✔
541
                consts.DrainIdle, dn.client); err != nil {
1✔
542
                return err
×
543
        }
×
544

545
        log.Log.Info("nodeStateSyncHandler(): sync succeeded")
1✔
546
        dn.currentNodeState = dn.desiredNodeState.DeepCopy()
1✔
547
        if vars.UsingSystemdMode {
1✔
548
                dn.refreshCh <- Message{
×
549
                        syncStatus:    sriovResult.SyncStatus,
×
550
                        lastSyncError: sriovResult.LastSyncError,
×
551
                }
×
552
        } else {
1✔
553
                dn.refreshCh <- Message{
1✔
554
                        syncStatus:    consts.SyncStatusSucceeded,
1✔
555
                        lastSyncError: "",
1✔
556
                }
1✔
557
        }
1✔
558
        // wait for writer to refresh the status
559
        <-dn.syncCh
1✔
560
        return nil
1✔
561
}
562

563
func (dn *Daemon) handleDrain(reqReboot bool) error {
1✔
564
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) {
1✔
565
                log.Log.Info("handleDrain(): the node complete the draining")
×
566
                return nil
×
567
        }
×
568

569
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) {
1✔
570
                log.Log.Info("handleDrain(): the node is still draining")
×
571
                return nil
×
572
        }
×
573

574
        if dn.disableDrain {
1✔
575
                log.Log.Info("handleDrain(): drain is disabled in sriovOperatorConfig")
×
576
                return nil
×
577
        }
×
578

579
        if reqReboot {
1✔
580
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for node")
×
581
                err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.RebootRequired, dn.client)
×
582
                if err != nil {
×
583
                        log.Log.Error(err, "applyDrainRequired(): Failed to annotate node")
×
584
                        return err
×
585
                }
×
586

587
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for nodeState")
×
588
                if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
589
                        consts.NodeStateDrainAnnotation,
×
590
                        consts.RebootRequired, dn.client); err != nil {
×
591
                        return err
×
592
                }
×
593

594
                return nil
×
595
        }
596
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for node")
1✔
597
        err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainRequired, dn.client)
1✔
598
        if err != nil {
1✔
599
                log.Log.Error(err, "handleDrain(): Failed to annotate node")
×
600
                return err
×
601
        }
×
602

603
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for nodeState")
1✔
604
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
1✔
605
                consts.NodeStateDrainAnnotation,
1✔
606
                consts.DrainRequired, dn.client); err != nil {
1✔
607
                return err
×
608
        }
×
609

610
        return nil
1✔
611
}
612

613
func (dn *Daemon) restartDevicePluginPod() error {
1✔
614
        dn.mu.Lock()
1✔
615
        defer dn.mu.Unlock()
1✔
616
        log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod")
1✔
617

1✔
618
        var podToDelete string
1✔
619
        pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{
1✔
620
                LabelSelector:   "app=sriov-device-plugin",
1✔
621
                FieldSelector:   "spec.nodeName=" + vars.NodeName,
1✔
622
                ResourceVersion: "0",
1✔
623
        })
1✔
624
        if err != nil {
1✔
625
                if errors.IsNotFound(err) {
×
626
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
627
                        return nil
×
628
                }
×
629
                log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying")
×
630
                return err
×
631
        }
632

633
        if len(pods.Items) == 0 {
1✔
634
                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
635
                return nil
×
636
        }
×
637
        podToDelete = pods.Items[0].Name
1✔
638

1✔
639
        log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete)
1✔
640
        err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{})
1✔
641
        if errors.IsNotFound(err) {
1✔
642
                log.Log.Info("restartDevicePluginPod(): pod to delete not found")
×
643
                return nil
×
644
        }
×
645
        if err != nil {
1✔
646
                log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying")
×
647
                return err
×
648
        }
×
649

650
        if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) {
2✔
651
                _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{})
1✔
652
                if errors.IsNotFound(err) {
2✔
653
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
1✔
654
                        return true, nil
1✔
655
                }
1✔
656

657
                if err != nil {
×
658
                        log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying")
×
659
                } else {
×
660
                        log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete)
×
661
                }
×
662
                return false, nil
×
663
        }, dn.stopCh); err != nil {
×
664
                log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion")
×
665
                return err
×
666
        }
×
667

668
        return nil
1✔
669
}
670

671
func (dn *Daemon) rebootNode() {
×
672
        log.Log.Info("rebootNode(): trigger node reboot")
×
673
        exit, err := dn.HostHelpers.Chroot(consts.Host)
×
674
        if err != nil {
×
675
                log.Log.Error(err, "rebootNode(): chroot command failed")
×
676
        }
×
677
        defer exit()
×
678
        // creates a new transient systemd unit to reboot the system.
×
679
        // We explictily try to stop kubelet.service first, before anything else; this
×
680
        // way we ensure the rest of system stays running, because kubelet may need
×
681
        // to do "graceful" shutdown by e.g. de-registering with a load balancer.
×
682
        // However note we use `;` instead of `&&` so we keep rebooting even
×
683
        // if kubelet failed to shutdown - that way the machine will still eventually reboot
×
684
        // as systemd will time out the stop invocation.
×
685
        cmd := exec.Command("systemd-run", "--unit", "sriov-network-config-daemon-reboot",
×
686
                "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot")
×
687

×
688
        if err := cmd.Run(); err != nil {
×
689
                log.Log.Error(err, "failed to reboot node")
×
690
        }
×
691
}
692

693
func (dn *Daemon) prepareNMUdevRule() error {
1✔
694
        // we need to remove the Red Hat Virtio network device from the udev rule configuration
1✔
695
        // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot
1✔
696
        // even that the operator should not be installed on virtual environments that are not openstack
1✔
697
        // we should not destroy the cluster if the operator is installed there
1✔
698
        supportedVfIds := []string{}
1✔
699
        for _, vfID := range sriovnetworkv1.GetSupportedVfIds() {
2✔
700
                if vfID == "0x1000" || vfID == "0x1041" {
1✔
701
                        continue
×
702
                }
703
                supportedVfIds = append(supportedVfIds, vfID)
1✔
704
        }
705

706
        return dn.HostHelpers.PrepareNMUdevRule(supportedVfIds)
1✔
707
}
708

709
// isDrainCompleted returns true if the current-state annotation is drain completed
710
func (dn *Daemon) isDrainCompleted() bool {
×
711
        return utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete)
×
712
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc