• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 11341895365

15 Oct 2024 08:08AM UTC coverage: 44.967% (-0.05%) from 45.017%
11341895365

Pull #792

github

web-flow
Merge 9c8ed6e3c into 8fe7a5e00
Pull Request #792: Clean Systemd files on exit

10 of 52 new or added lines in 2 files covered. (19.23%)

1 existing line in 1 file now uncovered.

6669 of 14831 relevant lines covered (44.97%)

0.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.88
/pkg/daemon/daemon.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "math/rand"
7
        "os/exec"
8
        "reflect"
9
        "sync"
10
        "time"
11

12
        "golang.org/x/time/rate"
13
        "k8s.io/apimachinery/pkg/api/errors"
14
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        "k8s.io/client-go/kubernetes"
18
        "k8s.io/client-go/tools/cache"
19
        "k8s.io/client-go/util/workqueue"
20
        "sigs.k8s.io/controller-runtime/pkg/client"
21
        "sigs.k8s.io/controller-runtime/pkg/log"
22

23
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
24
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
25
        sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
26
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
27
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate"
28
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
29
        snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
30
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
31
        plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins"
32
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd"
33
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
34
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
35
)
36

37
const (
38
        // updateDelay is the baseline speed at which we react to changes.  We don't
39
        // need to react in milliseconds as any change would involve rebooting the node.
40
        updateDelay = 5 * time.Second
41
        // maxUpdateBackoff is the maximum time to react to a change as we back off
42
        // in the face of errors.
43
        maxUpdateBackoff = 60 * time.Second
44
)
45

46
type Message struct {
47
        syncStatus    string
48
        lastSyncError string
49
}
50

51
type Daemon struct {
52
        client client.Client
53

54
        sriovClient snclientset.Interface
55
        // kubeClient allows interaction with Kubernetes, including the node we are running on.
56
        kubeClient kubernetes.Interface
57

58
        desiredNodeState *sriovnetworkv1.SriovNetworkNodeState
59
        currentNodeState *sriovnetworkv1.SriovNetworkNodeState
60

61
        // list of disabled plugins
62
        disabledPlugins []string
63

64
        loadedPlugins map[string]plugin.VendorPlugin
65

66
        HostHelpers helper.HostHelpersInterface
67

68
        platformHelpers platforms.Interface
69

70
        // channel used by callbacks to signal Run() of an error
71
        exitCh chan<- error
72

73
        // channel used to ensure all spawned goroutines exit when we exit.
74
        stopCh <-chan struct{}
75

76
        syncCh <-chan struct{}
77

78
        refreshCh chan<- Message
79

80
        mu sync.Mutex
81

82
        disableDrain bool
83

84
        workqueue workqueue.RateLimitingInterface
85

86
        eventRecorder *EventRecorder
87

88
        featureGate featuregate.FeatureGate
89
}
90

91
func New(
92
        client client.Client,
93
        sriovClient snclientset.Interface,
94
        kubeClient kubernetes.Interface,
95
        hostHelpers helper.HostHelpersInterface,
96
        platformHelper platforms.Interface,
97
        exitCh chan<- error,
98
        stopCh <-chan struct{},
99
        syncCh <-chan struct{},
100
        refreshCh chan<- Message,
101
        er *EventRecorder,
102
        featureGates featuregate.FeatureGate,
103
        disabledPlugins []string,
104
) *Daemon {
1✔
105
        return &Daemon{
1✔
106
                client:           client,
1✔
107
                sriovClient:      sriovClient,
1✔
108
                kubeClient:       kubeClient,
1✔
109
                HostHelpers:      hostHelpers,
1✔
110
                platformHelpers:  platformHelper,
1✔
111
                exitCh:           exitCh,
1✔
112
                stopCh:           stopCh,
1✔
113
                syncCh:           syncCh,
1✔
114
                refreshCh:        refreshCh,
1✔
115
                desiredNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
116
                currentNodeState: &sriovnetworkv1.SriovNetworkNodeState{},
1✔
117
                workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
1✔
118
                        &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
1✔
119
                        workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
1✔
120
                eventRecorder:   er,
1✔
121
                featureGate:     featureGates,
1✔
122
                disabledPlugins: disabledPlugins,
1✔
123
        }
1✔
124
}
1✔
125

126
// Run the config daemon
127
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
1✔
128
        log.Log.V(0).Info("Run()", "node", vars.NodeName)
1✔
129

1✔
130
        if vars.ClusterType == consts.ClusterTypeOpenshift {
1✔
131
                log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor())
×
132
        } else {
1✔
133
                log.Log.V(0).Info("Run(): start daemon.")
1✔
134
        }
1✔
135

136
        if !vars.UsingSystemdMode {
2✔
137
                log.Log.V(0).Info("Run(): daemon running in daemon mode")
1✔
138
                dn.HostHelpers.CheckRDMAEnabled()
1✔
139
                dn.HostHelpers.TryEnableTun()
1✔
140
                dn.HostHelpers.TryEnableVhostNet()
1✔
141
                err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift)
1✔
142
                if err != nil {
1✔
143
                        log.Log.Error(err, "failed to remove all the systemd sriov files")
×
144
                }
×
145
        } else {
×
146
                log.Log.V(0).Info("Run(): daemon running in systemd mode")
×
147
        }
×
148

149
        // Only watch own SriovNetworkNodeState CR
150
        defer utilruntime.HandleCrash()
1✔
151
        defer dn.workqueue.ShutDown()
1✔
152

1✔
153
        if err := dn.prepareNMUdevRule(); err != nil {
1✔
154
                log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs")
×
155
        }
×
156
        if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil {
1✔
157
                log.Log.Error(err, "failed to prepare udev files to rename VF representors for requested VFs")
×
158
        }
×
159

160
        var timeout int64 = 5
1✔
161
        var metadataKey = "metadata.name"
1✔
162
        informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
163
                time.Second*15,
1✔
164
                vars.Namespace,
1✔
165
                func(lo *metav1.ListOptions) {
2✔
166
                        lo.FieldSelector = metadataKey + "=" + vars.NodeName
1✔
167
                        lo.TimeoutSeconds = &timeout
1✔
168
                },
1✔
169
        )
170

171
        informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer()
1✔
172
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
173
                AddFunc: dn.enqueueNodeState,
1✔
174
                UpdateFunc: func(old, new interface{}) {
1✔
175
                        dn.enqueueNodeState(new)
×
176
                },
×
177
        })
178

179
        cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient,
1✔
180
                time.Second*30,
1✔
181
                vars.Namespace,
1✔
182
                func(lo *metav1.ListOptions) {
2✔
183
                        lo.FieldSelector = metadataKey + "=" + "default"
1✔
184
                },
1✔
185
        )
186

187
        cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer()
1✔
188
        cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
189
                AddFunc:    dn.operatorConfigAddHandler,
1✔
190
                UpdateFunc: dn.operatorConfigChangeHandler,
1✔
191
        })
1✔
192

1✔
193
        rand.Seed(time.Now().UnixNano())
1✔
194
        go cfgInformer.Run(dn.stopCh)
1✔
195
        time.Sleep(5 * time.Second)
1✔
196
        go informer.Run(dn.stopCh)
1✔
197
        if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, informer.HasSynced); !ok {
1✔
198
                return fmt.Errorf("failed to wait for caches to sync")
×
199
        }
×
200

201
        log.Log.Info("Starting workers")
1✔
202
        // Launch one worker to process
1✔
203
        go wait.Until(dn.runWorker, time.Second, stopCh)
1✔
204
        log.Log.Info("Started workers")
1✔
205

1✔
206
        for {
2✔
207
                select {
1✔
208
                case <-stopCh:
1✔
209
                        // clean files from host if we are running in systemd mode and the node
1✔
210
                        // is not required to be rebooted
1✔
211
                        dn.mu.Lock()
1✔
212
                        rebootrequired := utils.ObjectHasAnnotation(dn.desiredNodeState,
1✔
213
                                consts.NodeStateDrainAnnotation, consts.RebootRequired)
1✔
214
                        dn.mu.Unlock()
1✔
215

1✔
216
                        if vars.UsingSystemdMode && !rebootrequired {
1✔
NEW
217
                                err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift)
×
NEW
218
                                if err != nil {
×
NEW
219
                                        log.Log.Error(err, "failed to remove all the systemd sriov files")
×
NEW
220
                                        return err
×
NEW
221
                                }
×
222
                        }
223
                        log.Log.V(0).Info("Run(): stop daemon")
1✔
224
                        return nil
1✔
225
                case err, more := <-exitCh:
×
226
                        log.Log.Error(err, "got an error")
×
227
                        if more {
×
228
                                dn.refreshCh <- Message{
×
229
                                        syncStatus:    consts.SyncStatusFailed,
×
230
                                        lastSyncError: err.Error(),
×
231
                                }
×
232
                        }
×
233
                        return err
×
234
                }
235
        }
236
}
237

238
func (dn *Daemon) runWorker() {
1✔
239
        for dn.processNextWorkItem() {
2✔
240
        }
1✔
241
}
242

243
func (dn *Daemon) enqueueNodeState(obj interface{}) {
1✔
244
        var ns *sriovnetworkv1.SriovNetworkNodeState
1✔
245
        var ok bool
1✔
246
        if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok {
1✔
247
                utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj))
×
248
                return
×
249
        }
×
250
        key := ns.GetGeneration()
1✔
251
        dn.workqueue.Add(key)
1✔
252
}
253

254
func (dn *Daemon) processNextWorkItem() bool {
1✔
255
        log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len())
1✔
256
        obj, shutdown := dn.workqueue.Get()
1✔
257
        if shutdown {
2✔
258
                return false
1✔
259
        }
1✔
260

261
        log.Log.V(2).Info("get item from queue", "item", obj.(int64))
1✔
262

1✔
263
        // We wrap this block in a func so we can defer c.workqueue.Done.
1✔
264
        err := func(obj interface{}) error {
2✔
265
                // We call Done here so the workqueue knows we have finished
1✔
266
                // processing this item.
1✔
267
                defer dn.workqueue.Done(obj)
1✔
268
                var key int64
1✔
269
                var ok bool
1✔
270
                if key, ok = obj.(int64); !ok {
1✔
271
                        // As the item in the workqueue is actually invalid, we call
×
272
                        // Forget here.
×
273
                        dn.workqueue.Forget(obj)
×
274
                        utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj))
×
275
                        return nil
×
276
                }
×
277

278
                err := dn.nodeStateSyncHandler()
1✔
279
                if err != nil {
1✔
280
                        // Ereport error message, and put the item back to work queue for retry.
×
281
                        dn.refreshCh <- Message{
×
282
                                syncStatus:    consts.SyncStatusFailed,
×
283
                                lastSyncError: err.Error(),
×
284
                        }
×
285
                        <-dn.syncCh
×
286
                        dn.workqueue.AddRateLimited(key)
×
287
                        return fmt.Errorf("error syncing: %s, requeuing", err.Error())
×
288
                }
×
289
                // Finally, if no error occurs we Forget this item so it does not
290
                // get queued again until another change happens.
291
                dn.workqueue.Forget(obj)
1✔
292
                log.Log.Info("Successfully synced")
1✔
293
                return nil
1✔
294
        }(obj)
295

296
        if err != nil {
1✔
297
                utilruntime.HandleError(err)
×
298
        }
×
299

300
        return true
1✔
301
}
302

303
func (dn *Daemon) operatorConfigAddHandler(obj interface{}) {
×
304
        dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj)
×
305
}
×
306

307
func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) {
×
308
        oldCfg := old.(*sriovnetworkv1.SriovOperatorConfig)
×
309
        newCfg := new.(*sriovnetworkv1.SriovOperatorConfig)
×
310
        if newCfg.Namespace != vars.Namespace || newCfg.Name != consts.DefaultConfigName {
×
311
                log.Log.V(2).Info("unsupported SriovOperatorConfig", "namespace", newCfg.Namespace, "name", newCfg.Name)
×
312
                return
×
313
        }
×
314

315
        snolog.SetLogLevel(newCfg.Spec.LogLevel)
×
316

×
317
        newDisableDrain := newCfg.Spec.DisableDrain
×
318
        if dn.disableDrain != newDisableDrain {
×
319
                dn.disableDrain = newDisableDrain
×
320
                log.Log.Info("Set Disable Drain", "value", dn.disableDrain)
×
321
        }
×
322

323
        if !reflect.DeepEqual(oldCfg.Spec.FeatureGates, newCfg.Spec.FeatureGates) {
×
324
                dn.featureGate.Init(newCfg.Spec.FeatureGates)
×
325
                log.Log.Info("Updated featureGates", "featureGates", dn.featureGate.String())
×
326
        }
×
327

328
        vars.MlxPluginFwReset = dn.featureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate)
×
329
}
330

331
func (dn *Daemon) nodeStateSyncHandler() error {
1✔
332
        dn.mu.Lock()
1✔
333
        defer dn.mu.Unlock()
1✔
334
        var err error
1✔
335
        // Get the latest NodeState
1✔
336
        var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""}
1✔
337
        dn.desiredNodeState, err = dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
338
        if err != nil {
1✔
339
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
340
                return err
×
341
        }
×
342
        latest := dn.desiredNodeState.GetGeneration()
1✔
343
        log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest)
1✔
344

1✔
345
        // load plugins if it has not loaded
1✔
346
        if len(dn.loadedPlugins) == 0 {
1✔
347
                dn.loadedPlugins, err = loadPlugins(dn.desiredNodeState, dn.HostHelpers, dn.disabledPlugins)
×
348
                if err != nil {
×
349
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins")
×
350
                        return err
×
351
                }
×
352
        }
353

354
        skipReconciliation := true
1✔
355
        // if the operator complete the drain operator we should continue the configuration
1✔
356
        if !dn.isDrainCompleted() {
2✔
357
                if vars.UsingSystemdMode && dn.currentNodeState.GetGeneration() == latest {
1✔
358
                        serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath)
×
359
                        if err != nil {
×
360
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host")
×
361
                                return err
×
362
                        }
×
363
                        postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath)
×
364
                        if err != nil {
×
365
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host")
×
366
                                return err
×
367
                        }
×
368

369
                        // if the service doesn't exist we should continue to let the k8s plugin to create the service files
370
                        // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply
371
                        // the system service and reboot the node the config-daemon doesn't need to do anything.
372
                        if !(serviceEnabled && postNetworkServiceEnabled) {
×
373
                                sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed,
×
374
                                        LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+
×
375
                                                "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)}
×
376
                        } else {
×
377
                                sriovResult, err = systemd.ReadSriovResult()
×
378
                                if err != nil {
×
379
                                        log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host")
×
380
                                        return err
×
381
                                }
×
382
                        }
383
                        if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed {
×
384
                                log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError)
×
385

×
386
                                // add the error but don't requeue
×
387
                                dn.refreshCh <- Message{
×
388
                                        syncStatus:    consts.SyncStatusFailed,
×
389
                                        lastSyncError: sriovResult.LastSyncError,
×
390
                                }
×
391
                                <-dn.syncCh
×
392
                                return nil
×
393
                        }
×
394
                }
395

396
                skipReconciliation, err = dn.shouldSkipReconciliation(dn.desiredNodeState)
1✔
397
                if err != nil {
1✔
398
                        return err
×
399
                }
×
400
        }
401

402
        // we are done with the configuration just return here
403
        if dn.currentNodeState.GetGeneration() == dn.desiredNodeState.GetGeneration() &&
1✔
404
                dn.desiredNodeState.Status.SyncStatus == consts.SyncStatusSucceeded && skipReconciliation {
1✔
405
                log.Log.Info("Current state and desire state are equal together with sync status succeeded nothing to do")
×
406
                return nil
×
407
        }
×
408

409
        dn.refreshCh <- Message{
1✔
410
                syncStatus:    consts.SyncStatusInProgress,
1✔
411
                lastSyncError: "",
1✔
412
        }
1✔
413
        // wait for writer to refresh status then pull again the latest node state
1✔
414
        <-dn.syncCh
1✔
415

1✔
416
        // we need to load the latest status to our object
1✔
417
        // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't
1✔
418
        // trigger the refresh
1✔
419
        updatedState, err := dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
420
        if err != nil {
1✔
421
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
422
                return err
×
423
        }
×
424
        dn.desiredNodeState.Status = updatedState.Status
1✔
425

1✔
426
        reqReboot := false
1✔
427
        reqDrain := false
1✔
428

1✔
429
        // check if any of the plugins required to drain or reboot the node
1✔
430
        for k, p := range dn.loadedPlugins {
2✔
431
                d, r := false, false
1✔
432
                if dn.currentNodeState.GetName() == "" {
2✔
433
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state")
1✔
434
                } else {
1✔
435
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state")
×
436
                }
×
437
                d, r, err = p.OnNodeStateChange(dn.desiredNodeState)
1✔
438
                if err != nil {
1✔
439
                        log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k)
×
440
                        return err
×
441
                }
×
442
                log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r)
1✔
443
                reqDrain = reqDrain || d
1✔
444
                reqReboot = reqReboot || r
1✔
445
        }
446

447
        // When running using systemd check if the applied configuration is the latest one
448
        // or there is a new config we need to apply
449
        // When using systemd configuration we write the file
450
        if vars.UsingSystemdMode {
1✔
451
                log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host")
×
452
                systemdConfModified, err := systemd.WriteConfFile(dn.desiredNodeState)
×
453
                if err != nil {
×
454
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode")
×
455
                        return err
×
456
                }
×
457
                if systemdConfModified {
×
458
                        // remove existing result file to make sure that we will not use outdated result, e.g. in case if
×
459
                        // systemd service was not triggered for some reason
×
460
                        err = systemd.RemoveSriovResult()
×
461
                        if err != nil {
×
462
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode")
×
463
                                return err
×
464
                        }
×
465
                }
466
                reqDrain = reqDrain || systemdConfModified
×
467
                // require reboot if drain needed for systemd mode
×
468
                reqReboot = reqReboot || systemdConfModified || reqDrain
×
469
                log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results",
×
470
                        "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
×
471

×
472
                err = systemd.WriteSriovSupportedNics()
×
473
                if err != nil {
×
474
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode")
×
475
                        return err
×
476
                }
×
477
        }
478

479
        log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon",
1✔
480
                "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
1✔
481

1✔
482
        // handle drain only if the plugin request drain, or we are already in a draining request state
1✔
483
        if reqDrain || !utils.ObjectHasAnnotation(dn.desiredNodeState,
1✔
484
                consts.NodeStateDrainAnnotationCurrent,
1✔
485
                consts.DrainIdle) {
1✔
486
                drainInProcess, err := dn.handleDrain(reqReboot)
×
487
                if err != nil {
×
488
                        log.Log.Error(err, "failed to handle drain")
×
489
                        return err
×
490
                }
×
491
                if drainInProcess {
×
492
                        return nil
×
493
                }
×
494
        }
495

496
        // apply the vendor plugins after we are done with drain if needed
497
        for k, p := range dn.loadedPlugins {
2✔
498
                // Skip both the general and virtual plugin apply them last
1✔
499
                if k != GenericPluginName && k != VirtualPluginName {
1✔
500
                        err := p.Apply()
×
501
                        if err != nil {
×
502
                                log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k)
×
503
                                return err
×
504
                        }
×
505
                }
506
        }
507

508
        // if we don't need to reboot, or we are not doing the configuration in systemd
509
        // we apply the generic plugin
510
        if !reqReboot && !vars.UsingSystemdMode {
2✔
511
                // For BareMetal machines apply the generic plugin
1✔
512
                selectedPlugin, ok := dn.loadedPlugins[GenericPluginName]
1✔
513
                if ok {
2✔
514
                        // Apply generic plugin last
1✔
515
                        err = selectedPlugin.Apply()
1✔
516
                        if err != nil {
1✔
517
                                log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply")
×
518
                                return err
×
519
                        }
×
520
                }
521

522
                // For Virtual machines apply the virtual plugin
523
                selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName]
1✔
524
                if ok {
1✔
525
                        // Apply virtual plugin last
×
526
                        err = selectedPlugin.Apply()
×
527
                        if err != nil {
×
528
                                log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply")
×
529
                                return err
×
530
                        }
×
531
                }
532
        }
533

534
        if reqReboot {
1✔
535
                log.Log.Info("nodeStateSyncHandler(): reboot node")
×
536
                dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
×
537
                dn.rebootNode()
×
538
                return nil
×
539
        }
×
540

541
        // restart device plugin pod
542
        log.Log.Info("nodeStateSyncHandler(): restart device plugin pod")
1✔
543
        if err := dn.restartDevicePluginPod(); err != nil {
1✔
544
                log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod")
×
545
                return err
×
546
        }
×
547

548
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node")
1✔
549
        err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client)
1✔
550
        if err != nil {
1✔
551
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to annotate node")
×
552
                return err
×
553
        }
×
554

555
        log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for nodeState")
1✔
556
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
1✔
557
                consts.NodeStateDrainAnnotation,
1✔
558
                consts.DrainIdle, dn.client); err != nil {
1✔
559
                return err
×
560
        }
×
561

562
        log.Log.Info("nodeStateSyncHandler(): sync succeeded")
1✔
563
        dn.currentNodeState = dn.desiredNodeState.DeepCopy()
1✔
564
        if vars.UsingSystemdMode {
1✔
565
                dn.refreshCh <- Message{
×
566
                        syncStatus:    sriovResult.SyncStatus,
×
567
                        lastSyncError: sriovResult.LastSyncError,
×
568
                }
×
569
        } else {
1✔
570
                dn.refreshCh <- Message{
1✔
571
                        syncStatus:    consts.SyncStatusSucceeded,
1✔
572
                        lastSyncError: "",
1✔
573
                }
1✔
574
        }
1✔
575
        // wait for writer to refresh the status
576
        <-dn.syncCh
1✔
577
        return nil
1✔
578
}
579

580
func (dn *Daemon) shouldSkipReconciliation(latestState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) {
1✔
581
        log.Log.V(0).Info("shouldSkipReconciliation()")
1✔
582
        var err error
1✔
583

1✔
584
        // Skip when SriovNetworkNodeState object has just been created.
1✔
585
        if latestState.GetGeneration() == 1 && len(latestState.Spec.Interfaces) == 0 {
1✔
586
                err = dn.HostHelpers.ClearPCIAddressFolder()
×
587
                if err != nil {
×
588
                        log.Log.Error(err, "failed to clear the PCI address configuration")
×
589
                        return false, err
×
590
                }
×
591

592
                log.Log.V(0).Info(
×
593
                        "shouldSkipReconciliation(): interface policy spec not yet set by controller for sriovNetworkNodeState",
×
594
                        "name", latestState.Name)
×
595
                if latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
596
                        dn.refreshCh <- Message{
×
597
                                syncStatus:    consts.SyncStatusSucceeded,
×
598
                                lastSyncError: "",
×
599
                        }
×
600
                        // wait for writer to refresh status
×
601
                        <-dn.syncCh
×
602
                }
×
603
                return true, nil
×
604
        }
605

606
        // Verify changes in the status of the SriovNetworkNodeState CR.
607
        if dn.currentNodeState.GetGeneration() == latestState.GetGeneration() {
1✔
608
                log.Log.V(0).Info("shouldSkipReconciliation() verifying status change")
×
609
                for _, p := range dn.loadedPlugins {
×
610
                        // Verify changes in the status of the SriovNetworkNodeState CR.
×
611
                        log.Log.V(0).Info("shouldSkipReconciliation(): verifying status change for plugin", "pluginName", p.Name())
×
612
                        changed, err := p.CheckStatusChanges(latestState)
×
613
                        if err != nil {
×
614
                                return false, err
×
615
                        }
×
616
                        if changed {
×
617
                                log.Log.V(0).Info("shouldSkipReconciliation(): plugin require change", "pluginName", p.Name())
×
618
                                return false, nil
×
619
                        }
×
620
                }
621

622
                log.Log.V(0).Info("shouldSkipReconciliation(): Interface not changed")
×
623
                if latestState.Status.LastSyncError != "" ||
×
624
                        latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
625
                        dn.refreshCh <- Message{
×
626
                                syncStatus:    consts.SyncStatusSucceeded,
×
627
                                lastSyncError: "",
×
628
                        }
×
629
                        // wait for writer to refresh the status
×
630
                        <-dn.syncCh
×
631
                }
×
632

633
                return true, nil
×
634
        }
635

636
        return false, nil
1✔
637
}
638

639
// handleDrain: adds the right annotation to the node and nodeState object
640
// returns true if we need to finish the reconcile loop and wait for a new object
641
func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) {
×
642
        // done with the drain we can continue with the configuration
×
643
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) {
×
644
                log.Log.Info("handleDrain(): the node complete the draining")
×
645
                return false, nil
×
646
        }
×
647

648
        // the operator is still draining the node so we reconcile
649
        if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) {
×
650
                log.Log.Info("handleDrain(): the node is still draining")
×
651
                return true, nil
×
652
        }
×
653

654
        // drain is disabled we continue with the configuration
655
        if dn.disableDrain {
×
656
                log.Log.Info("handleDrain(): drain is disabled in sriovOperatorConfig")
×
657
                return false, nil
×
658
        }
×
659

660
        if reqReboot {
×
661
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for node")
×
662
                err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.RebootRequired, dn.client)
×
663
                if err != nil {
×
664
                        log.Log.Error(err, "applyDrainRequired(): Failed to annotate node")
×
665
                        return false, err
×
666
                }
×
667

668
                log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for nodeState")
×
669
                if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
670
                        consts.NodeStateDrainAnnotation,
×
671
                        consts.RebootRequired, dn.client); err != nil {
×
672
                        return false, err
×
673
                }
×
674

675
                // the node was annotated we need to wait for the operator to finish the drain
676
                return true, nil
×
677
        }
678
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for node")
×
679
        err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainRequired, dn.client)
×
680
        if err != nil {
×
681
                log.Log.Error(err, "handleDrain(): Failed to annotate node")
×
682
                return false, err
×
683
        }
×
684

685
        log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for nodeState")
×
686
        if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState,
×
687
                consts.NodeStateDrainAnnotation,
×
688
                consts.DrainRequired, dn.client); err != nil {
×
689
                return false, err
×
690
        }
×
691

692
        // the node was annotated we need to wait for the operator to finish the drain
693
        return true, nil
×
694
}
695

696
func (dn *Daemon) restartDevicePluginPod() error {
1✔
697
        log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod")
1✔
698

1✔
699
        pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{
1✔
700
                LabelSelector:   "app=sriov-device-plugin",
1✔
701
                FieldSelector:   "spec.nodeName=" + vars.NodeName,
1✔
702
                ResourceVersion: "0",
1✔
703
        })
1✔
704
        if err != nil {
1✔
705
                if errors.IsNotFound(err) {
×
706
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
707
                        return nil
×
708
                }
×
709
                log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying")
×
710
                return err
×
711
        }
712

713
        if len(pods.Items) == 0 {
1✔
714
                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
715
                return nil
×
716
        }
×
717

718
        for _, pod := range pods.Items {
2✔
719
                podToDelete := pod.Name
1✔
720
                log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete)
1✔
721
                err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{})
1✔
722
                if errors.IsNotFound(err) {
1✔
723
                        log.Log.Info("restartDevicePluginPod(): pod to delete not found")
×
724
                        continue
×
725
                }
726
                if err != nil {
1✔
727
                        log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying")
×
728
                        return err
×
729
                }
×
730

731
                if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) {
2✔
732
                        _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{})
1✔
733
                        if errors.IsNotFound(err) {
2✔
734
                                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
1✔
735
                                return true, nil
1✔
736
                        }
1✔
737

738
                        if err != nil {
×
739
                                log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying")
×
740
                        } else {
×
741
                                log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete)
×
742
                        }
×
743
                        return false, nil
×
744
                }, dn.stopCh); err != nil {
×
745
                        log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion")
×
746
                        return err
×
747
                }
×
748
        }
749

750
        return nil
1✔
751
}
752

753
func (dn *Daemon) rebootNode() {
×
754
        log.Log.Info("rebootNode(): trigger node reboot")
×
755
        exit, err := dn.HostHelpers.Chroot(consts.Host)
×
756
        if err != nil {
×
757
                log.Log.Error(err, "rebootNode(): chroot command failed")
×
758
        }
×
759
        defer exit()
×
760
        // creates a new transient systemd unit to reboot the system.
×
761
        // We explictily try to stop kubelet.service first, before anything else; this
×
762
        // way we ensure the rest of system stays running, because kubelet may need
×
763
        // to do "graceful" shutdown by e.g. de-registering with a load balancer.
×
764
        // However note we use `;` instead of `&&` so we keep rebooting even
×
765
        // if kubelet failed to shutdown - that way the machine will still eventually reboot
×
766
        // as systemd will time out the stop invocation.
×
767
        cmd := exec.Command("systemd-run", "--unit", "sriov-network-config-daemon-reboot",
×
768
                "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot")
×
769

×
770
        if err := cmd.Run(); err != nil {
×
771
                log.Log.Error(err, "failed to reboot node")
×
772
        }
×
773
}
774

775
func (dn *Daemon) prepareNMUdevRule() error {
1✔
776
        // we need to remove the Red Hat Virtio network device from the udev rule configuration
1✔
777
        // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot
1✔
778
        // even that the operator should not be installed on virtual environments that are not openstack
1✔
779
        // we should not destroy the cluster if the operator is installed there
1✔
780
        supportedVfIds := []string{}
1✔
781
        for _, vfID := range sriovnetworkv1.GetSupportedVfIds() {
2✔
782
                if vfID == "0x1000" || vfID == "0x1041" {
1✔
783
                        continue
×
784
                }
785
                supportedVfIds = append(supportedVfIds, vfID)
1✔
786
        }
787

788
        return dn.HostHelpers.PrepareNMUdevRule(supportedVfIds)
1✔
789
}
790

791
// isDrainCompleted returns true if the current-state annotation is drain completed
792
func (dn *Daemon) isDrainCompleted() bool {
1✔
793
        return utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete)
1✔
794
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc