• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

k8snetworkplumbingwg / sriov-network-operator / 7889385247

13 Feb 2024 04:09PM UTC coverage: 30.011% (-0.2%) from 30.177%
7889385247

Pull #634

github

web-flow
Merge b4ccc1a84 into 1163ef9d1
Pull Request #634: Optmize node watcher in daemon

12 of 50 new or added lines in 1 file covered. (24.0%)

8 existing lines in 1 file now uncovered.

3538 of 11789 relevant lines covered (30.01%)

0.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.46
/pkg/daemon/daemon.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "fmt"
8
        "math/rand"
9
        "os"
10
        "os/exec"
11
        "path"
12
        "strconv"
13
        "strings"
14
        "sync"
15
        "time"
16

17
        mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
18
        daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants"
19
        mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions"
20
        "golang.org/x/time/rate"
21
        corev1 "k8s.io/api/core/v1"
22
        "k8s.io/apimachinery/pkg/api/errors"
23
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24
        "k8s.io/apimachinery/pkg/labels"
25
        "k8s.io/apimachinery/pkg/types"
26
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27
        "k8s.io/apimachinery/pkg/util/strategicpatch"
28
        "k8s.io/apimachinery/pkg/util/wait"
29
        "k8s.io/client-go/informers"
30
        "k8s.io/client-go/kubernetes"
31
        listerv1 "k8s.io/client-go/listers/core/v1"
32
        "k8s.io/client-go/tools/cache"
33
        "k8s.io/client-go/tools/leaderelection"
34
        "k8s.io/client-go/tools/leaderelection/resourcelock"
35
        "k8s.io/client-go/util/workqueue"
36
        "k8s.io/kubectl/pkg/drain"
37
        "sigs.k8s.io/controller-runtime/pkg/log"
38

39
        sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
40
        snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
41
        sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
42
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
43
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper"
44
        snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
45
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
46
        plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins"
47
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd"
48
        "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
49
)
50

51
const (
52
        // updateDelay is the baseline speed at which we react to changes.  We don't
53
        // need to react in milliseconds as any change would involve rebooting the node.
54
        updateDelay = 5 * time.Second
55
        // maxUpdateBackoff is the maximum time to react to a change as we back off
56
        // in the face of errors.
57
        maxUpdateBackoff = 60 * time.Second
58
)
59

60
type Message struct {
61
        syncStatus    string
62
        lastSyncError string
63
}
64

65
type Daemon struct {
66
        client snclientset.Interface
67
        // kubeClient allows interaction with Kubernetes, including the node we are running on.
68
        kubeClient kubernetes.Interface
69

70
        nodeState *sriovnetworkv1.SriovNetworkNodeState
71

72
        // list of disabled plugins
73
        disabledPlugins []string
74

75
        loadedPlugins map[string]plugin.VendorPlugin
76

77
        HostHelpers helper.HostHelpersInterface
78

79
        platformHelpers platforms.Interface
80

81
        // channel used by callbacks to signal Run() of an error
82
        exitCh chan<- error
83

84
        // channel used to ensure all spawned goroutines exit when we exit.
85
        stopCh <-chan struct{}
86

87
        syncCh <-chan struct{}
88

89
        refreshCh chan<- Message
90

91
        mu *sync.Mutex
92

93
        drainer *drain.Helper
94

95
        node *corev1.Node
96

97
        drainable bool
98

99
        disableDrain bool
100

101
        workqueue workqueue.RateLimitingInterface
102

103
        mcpName string
104

105
        eventRecorder *EventRecorder
106
}
107

108
const (
109
        udevScriptsPath = "/bindata/scripts/load-udev.sh"
110
        annoKey         = "sriovnetwork.openshift.io/state"
111
        annoIdle        = "Idle"
112
        annoDraining    = "Draining"
113
        annoMcpPaused   = "Draining_MCP_Paused"
114
)
115

116
// writer implements io.Writer interface as a pass-through for log.Log.
117
type writer struct {
118
        logFunc func(msg string, keysAndValues ...interface{})
119
}
120

121
// Write passes string(p) into writer's logFunc and always returns len(p)
122
func (w writer) Write(p []byte) (n int, err error) {
×
123
        w.logFunc(string(p))
×
124
        return len(p), nil
×
125
}
×
126

127
func New(
128
        client snclientset.Interface,
129
        kubeClient kubernetes.Interface,
130
        hostHelpers helper.HostHelpersInterface,
131
        platformHelper platforms.Interface,
132
        exitCh chan<- error,
133
        stopCh <-chan struct{},
134
        syncCh <-chan struct{},
135
        refreshCh chan<- Message,
136
        er *EventRecorder,
137
        disabledPlugins []string,
138
) *Daemon {
1✔
139
        return &Daemon{
1✔
140
                client:          client,
1✔
141
                kubeClient:      kubeClient,
1✔
142
                HostHelpers:     hostHelpers,
1✔
143
                platformHelpers: platformHelper,
1✔
144
                exitCh:          exitCh,
1✔
145
                stopCh:          stopCh,
1✔
146
                syncCh:          syncCh,
1✔
147
                refreshCh:       refreshCh,
1✔
148
                nodeState:       &sriovnetworkv1.SriovNetworkNodeState{},
1✔
149
                drainer: &drain.Helper{
1✔
150
                        Client:              kubeClient,
1✔
151
                        Force:               true,
1✔
152
                        IgnoreAllDaemonSets: true,
1✔
153
                        DeleteEmptyDirData:  true,
1✔
154
                        GracePeriodSeconds:  -1,
1✔
155
                        Timeout:             90 * time.Second,
1✔
156
                        OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
1✔
157
                                verbStr := "Deleted"
×
158
                                if usingEviction {
×
159
                                        verbStr = "Evicted"
×
160
                                }
×
161
                                log.Log.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
×
162
                        },
163
                        Out:    writer{log.Log.Info},
164
                        ErrOut: writer{func(msg string, kv ...interface{}) { log.Log.Error(nil, msg, kv...) }},
×
165
                        Ctx:    context.Background(),
166
                },
167
                workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
168
                        &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
169
                        workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
170
                eventRecorder:   er,
171
                disabledPlugins: disabledPlugins,
172
        }
173
}
174

175
// Run the config daemon
176
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
1✔
177
        log.Log.V(0).Info("Run()", "node", vars.NodeName)
1✔
178

1✔
179
        if vars.ClusterType == consts.ClusterTypeOpenshift {
1✔
180
                log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor())
×
181
        } else {
1✔
182
                log.Log.V(0).Info("Run(): start daemon.")
1✔
183
        }
1✔
184

185
        if !vars.UsingSystemdMode {
2✔
186
                log.Log.V(0).Info("Run(): daemon running in daemon mode")
1✔
187
                dn.HostHelpers.TryEnableRdma()
1✔
188
                dn.HostHelpers.TryEnableTun()
1✔
189
                dn.HostHelpers.TryEnableVhostNet()
1✔
190
                err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift)
1✔
191
                if err != nil {
1✔
192
                        log.Log.Error(err, "failed to remove all the systemd sriov files")
×
193
                }
×
194
        } else {
×
195
                log.Log.V(0).Info("Run(): daemon running in systemd mode")
×
196
        }
×
197

198
        // Only watch own SriovNetworkNodeState CR
199
        defer utilruntime.HandleCrash()
1✔
200
        defer dn.workqueue.ShutDown()
1✔
201

1✔
202
        if err := dn.prepareNMUdevRule(); err != nil {
1✔
203
                log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs")
×
204
        }
×
205
        if err := dn.tryCreateSwitchdevUdevRule(); err != nil {
2✔
206
                log.Log.Error(err, "failed to create udev files for switchdev")
1✔
207
        }
1✔
208

209
        var timeout int64 = 5
1✔
210
        var metadataKey = "metadata.name"
1✔
211
        dn.mu = &sync.Mutex{}
1✔
212
        informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.client,
1✔
213
                time.Second*15,
1✔
214
                vars.Namespace,
1✔
215
                func(lo *metav1.ListOptions) {
2✔
216
                        lo.FieldSelector = metadataKey + "=" + vars.NodeName
1✔
217
                        lo.TimeoutSeconds = &timeout
1✔
218
                },
1✔
219
        )
220

221
        informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer()
1✔
222
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
223
                AddFunc: dn.enqueueNodeState,
1✔
224
                UpdateFunc: func(old, new interface{}) {
1✔
225
                        dn.enqueueNodeState(new)
×
226
                },
×
227
        })
228

229
        cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.client,
1✔
230
                time.Second*30,
1✔
231
                vars.Namespace,
1✔
232
                func(lo *metav1.ListOptions) {
2✔
233
                        lo.FieldSelector = metadataKey + "=" + "default"
1✔
234
                },
1✔
235
        )
236

237
        cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer()
1✔
238
        cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
239
                AddFunc:    dn.operatorConfigAddHandler,
1✔
240
                UpdateFunc: dn.operatorConfigChangeHandler,
1✔
241
        })
1✔
242

1✔
243
        rand.Seed(time.Now().UnixNano())
1✔
244
        nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(dn.kubeClient,
1✔
245
                time.Second*15,
1✔
246
                informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
2✔
247
                        lo.FieldSelector = metadataKey + "=" + vars.NodeName
1✔
248
                }),
1✔
249
        )
250
        nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()
1✔
251
        nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
252
                AddFunc:    dn.currentNodeAddHandler,
1✔
253
                UpdateFunc: dn.currentNodeUpdateHandler,
1✔
254
                DeleteFunc: dn.currentNodeDeleteHandler,
1✔
255
        })
1✔
256
        go cfgInformer.Run(dn.stopCh)
1✔
257
        go nodeInformer.Run(dn.stopCh)
1✔
258
        time.Sleep(5 * time.Second)
1✔
259
        go informer.Run(dn.stopCh)
1✔
260
        if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, nodeInformer.HasSynced, informer.HasSynced); !ok {
1✔
261
                return fmt.Errorf("failed to wait for caches to sync")
×
262
        }
×
263

264
        log.Log.Info("Starting workers")
1✔
265
        // Launch one worker to process
1✔
266
        go wait.Until(dn.runWorker, time.Second, stopCh)
1✔
267
        log.Log.Info("Started workers")
1✔
268

1✔
269
        for {
2✔
270
                select {
1✔
271
                case <-stopCh:
1✔
272
                        log.Log.V(0).Info("Run(): stop daemon")
1✔
273
                        return nil
1✔
274
                case err, more := <-exitCh:
×
275
                        log.Log.Error(err, "got an error")
×
276
                        if more {
×
277
                                dn.refreshCh <- Message{
×
278
                                        syncStatus:    consts.SyncStatusFailed,
×
279
                                        lastSyncError: err.Error(),
×
280
                                }
×
281
                        }
×
282
                        return err
×
283
                case <-time.After(30 * time.Second):
×
284
                        log.Log.V(2).Info("Run(): period refresh")
×
285
                        if err := dn.tryCreateSwitchdevUdevRule(); err != nil {
×
286
                                log.Log.V(2).Error(err, "Could not create udev rule")
×
287
                        }
×
288
                }
289
        }
290
}
291

292
func (dn *Daemon) runWorker() {
1✔
293
        for dn.processNextWorkItem() {
2✔
294
        }
1✔
295
}
296

297
func (dn *Daemon) enqueueNodeState(obj interface{}) {
1✔
298
        var ns *sriovnetworkv1.SriovNetworkNodeState
1✔
299
        var ok bool
1✔
300
        if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok {
1✔
301
                utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj))
×
302
                return
×
303
        }
×
304
        key := ns.GetGeneration()
1✔
305
        dn.workqueue.Add(key)
1✔
306
}
307

308
func (dn *Daemon) processNextWorkItem() bool {
1✔
309
        log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len())
1✔
310
        obj, shutdown := dn.workqueue.Get()
1✔
311
        if shutdown {
2✔
312
                return false
1✔
313
        }
1✔
314

315
        log.Log.V(2).Info("get item from queue", "item", obj.(int64))
1✔
316

1✔
317
        // We wrap this block in a func so we can defer c.workqueue.Done.
1✔
318
        err := func(obj interface{}) error {
2✔
319
                // We call Done here so the workqueue knows we have finished
1✔
320
                // processing this item.
1✔
321
                defer dn.workqueue.Done(obj)
1✔
322
                var key int64
1✔
323
                var ok bool
1✔
324
                if key, ok = obj.(int64); !ok {
1✔
325
                        // As the item in the workqueue is actually invalid, we call
×
326
                        // Forget here.
×
327
                        dn.workqueue.Forget(obj)
×
328
                        utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj))
×
329
                        return nil
×
330
                }
×
331

332
                err := dn.nodeStateSyncHandler()
1✔
333
                if err != nil {
1✔
334
                        // Ereport error message, and put the item back to work queue for retry.
×
335
                        dn.refreshCh <- Message{
×
336
                                syncStatus:    consts.SyncStatusFailed,
×
337
                                lastSyncError: err.Error(),
×
338
                        }
×
339
                        <-dn.syncCh
×
340
                        dn.workqueue.AddRateLimited(key)
×
341
                        return fmt.Errorf("error syncing: %s, requeuing", err.Error())
×
342
                }
×
343
                // Finally, if no error occurs we Forget this item so it does not
344
                // get queued again until another change happens.
345
                dn.workqueue.Forget(obj)
1✔
346
                log.Log.Info("Successfully synced")
1✔
347
                return nil
1✔
348
        }(obj)
349

350
        if err != nil {
1✔
351
                utilruntime.HandleError(err)
×
352
        }
×
353

354
        return true
1✔
355
}
356

357
func (dn *Daemon) currentNodeAddHandler(obj interface{}) {
1✔
358
        dn.currentNodeUpdateHandler(nil, obj)
1✔
359
}
1✔
360

361
func (dn *Daemon) currentNodeUpdateHandler(old, new interface{}) {
1✔
362
        dn.node = new.(*corev1.Node).DeepCopy()
1✔
363
}
1✔
364

NEW
365
func (dn *Daemon) currentNodeDeleteHandler(obj interface{}) {
×
NEW
366
        log.Log.V(2).Info("nodeUpdateHandler(): node has been deleted", "name", vars.NodeName)
×
NEW
367
}
×
368

NEW
369
func (dn *Daemon) startAllNodesInformer(stopWatchCh chan struct{}) error {
×
NEW
370
        go func() {
×
NEW
371
                for {
×
NEW
372
                        select {
×
NEW
373
                        case <-dn.stopCh:
×
NEW
374
                                close(stopWatchCh)
×
NEW
375
                                return
×
NEW
376
                        case <-stopWatchCh:
×
NEW
377
                                return
×
378
                        }
379
                }
380
        }()
NEW
381
        nodeInformerFactory := informers.NewSharedInformerFactory(dn.kubeClient,
×
NEW
382
                time.Minute*5,
×
NEW
383
        )
×
NEW
384
        nodeLister := nodeInformerFactory.Core().V1().Nodes().Lister()
×
NEW
385
        nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()
×
NEW
386
        nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
NEW
387
                AddFunc:    func(_ interface{}) { dn.nodeUpdateHandler(nodeLister) },
×
NEW
388
                UpdateFunc: func(_, _ interface{}) { dn.nodeUpdateHandler(nodeLister) },
×
389
        })
NEW
390
        go nodeInformer.Run(stopWatchCh)
×
NEW
391
        if ok := cache.WaitForCacheSync(stopWatchCh, nodeInformer.HasSynced); !ok {
×
NEW
392
                return fmt.Errorf("failed to wait for node caches to sync")
×
UNCOV
393
        }
×
NEW
394
        dn.nodeUpdateHandler(nodeLister)
×
NEW
395
        return nil
×
396
}
397

NEW
398
func (dn *Daemon) nodeUpdateHandler(nodeLister listerv1.NodeLister) {
×
NEW
399
        nodes, err := nodeLister.List(labels.Everything())
×
400
        if err != nil {
×
401
                log.Log.Error(err, "nodeUpdateHandler(): failed to list nodes")
×
402
                return
×
403
        }
×
404

405
        // Checking if other nodes are draining
UNCOV
406
        for _, otherNode := range nodes {
×
UNCOV
407
                if otherNode.GetName() == vars.NodeName {
×
UNCOV
408
                        continue
×
409
                }
410

411
                drainingAnnotationValue := otherNode.Annotations[annoKey]
×
412
                if drainingAnnotationValue == annoDraining || drainingAnnotationValue == annoMcpPaused {
×
413
                        log.Log.V(2).Info("nodeUpdateHandler(): node is not drainable, another node is draining",
×
414
                                "other-node", otherNode.Name, "annotation", annoKey+"="+drainingAnnotationValue)
×
415
                        dn.drainable = false
×
416
                        return
×
417
                }
×
418
        }
419

UNCOV
420
        if !dn.drainable {
×
UNCOV
421
                log.Log.V(2).Info("nodeUpdateHandler(): node is now drainable")
×
UNCOV
422
        }
×
423

UNCOV
424
        dn.drainable = true
×
425
}
426

427
func (dn *Daemon) operatorConfigAddHandler(obj interface{}) {
×
428
        dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj)
×
429
}
×
430

431
func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) {
×
432
        newCfg := new.(*sriovnetworkv1.SriovOperatorConfig)
×
433
        snolog.SetLogLevel(newCfg.Spec.LogLevel)
×
434

×
435
        newDisableDrain := newCfg.Spec.DisableDrain
×
436
        if dn.disableDrain != newDisableDrain {
×
437
                dn.disableDrain = newDisableDrain
×
438
                log.Log.Info("Set Disable Drain", "value", dn.disableDrain)
×
439
        }
×
440
}
441

442
func (dn *Daemon) nodeStateSyncHandler() error {
1✔
443
        var err error
1✔
444
        // Get the latest NodeState
1✔
445
        var latestState *sriovnetworkv1.SriovNetworkNodeState
1✔
446
        var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""}
1✔
447
        latestState, err = dn.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
448
        if err != nil {
1✔
449
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
450
                return err
×
451
        }
×
452
        latest := latestState.GetGeneration()
1✔
453
        log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest)
1✔
454

1✔
455
        if vars.ClusterType == consts.ClusterTypeOpenshift && !dn.platformHelpers.IsHypershift() {
1✔
456
                if err = dn.getNodeMachinePool(); err != nil {
×
457
                        return err
×
458
                }
×
459
        }
460

461
        if dn.nodeState.GetGeneration() == latest {
1✔
462
                if vars.UsingSystemdMode {
×
463
                        serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath)
×
464
                        if err != nil {
×
465
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host")
×
466
                                return err
×
467
                        }
×
468
                        postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath)
×
469
                        if err != nil {
×
470
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host")
×
471
                                return err
×
472
                        }
×
473

474
                        // if the service doesn't exist we should continue to let the k8s plugin to create the service files
475
                        // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply
476
                        // the system service and reboot the node the config-daemon doesn't need to do anything.
477
                        if !(serviceEnabled && postNetworkServiceEnabled) {
×
478
                                sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed,
×
479
                                        LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+
×
480
                                                "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)}
×
481
                        } else {
×
482
                                sriovResult, err = systemd.ReadSriovResult()
×
483
                                if err != nil {
×
484
                                        log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host")
×
485
                                        return err
×
486
                                }
×
487
                        }
488
                        if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed {
×
489
                                log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError)
×
490

×
491
                                // add the error but don't requeue
×
492
                                dn.refreshCh <- Message{
×
493
                                        syncStatus:    consts.SyncStatusFailed,
×
494
                                        lastSyncError: sriovResult.LastSyncError,
×
495
                                }
×
496
                                <-dn.syncCh
×
497
                                return nil
×
498
                        }
×
499
                }
500
                log.Log.V(0).Info("nodeStateSyncHandler(): Interface not changed")
×
501
                if latestState.Status.LastSyncError != "" ||
×
502
                        latestState.Status.SyncStatus != consts.SyncStatusSucceeded {
×
503
                        dn.refreshCh <- Message{
×
504
                                syncStatus:    consts.SyncStatusSucceeded,
×
505
                                lastSyncError: "",
×
506
                        }
×
507
                        // wait for writer to refresh the status
×
508
                        <-dn.syncCh
×
509
                }
×
510

511
                return nil
×
512
        }
513

514
        if latestState.GetGeneration() == 1 && len(latestState.Spec.Interfaces) == 0 {
1✔
515
                err = dn.HostHelpers.ClearPCIAddressFolder()
×
516
                if err != nil {
×
517
                        log.Log.Error(err, "failed to clear the PCI address configuration")
×
518
                        return err
×
519
                }
×
520

521
                log.Log.V(0).Info(
×
522
                        "nodeStateSyncHandler(): interface policy spec not yet set by controller for sriovNetworkNodeState",
×
523
                        "name", latestState.Name)
×
524
                if latestState.Status.SyncStatus != "Succeeded" {
×
525
                        dn.refreshCh <- Message{
×
526
                                syncStatus:    "Succeeded",
×
527
                                lastSyncError: "",
×
528
                        }
×
529
                        // wait for writer to refresh status
×
530
                        <-dn.syncCh
×
531
                }
×
532
                return nil
×
533
        }
534

535
        dn.refreshCh <- Message{
1✔
536
                syncStatus:    consts.SyncStatusInProgress,
1✔
537
                lastSyncError: "",
1✔
538
        }
1✔
539
        // wait for writer to refresh status then pull again the latest node state
1✔
540
        <-dn.syncCh
1✔
541

1✔
542
        // we need to load the latest status to our object
1✔
543
        // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't
1✔
544
        // trigger the refresh
1✔
545
        updatedState, err := dn.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
546
        if err != nil {
1✔
547
                log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName)
×
548
                return err
×
549
        }
×
550
        latestState.Status = updatedState.Status
1✔
551

1✔
552
        // load plugins if it has not loaded
1✔
553
        if len(dn.loadedPlugins) == 0 {
1✔
554
                dn.loadedPlugins, err = loadPlugins(latestState, dn.HostHelpers, dn.disabledPlugins)
×
555
                if err != nil {
×
556
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins")
×
557
                        return err
×
558
                }
×
559
        }
560

561
        reqReboot := false
1✔
562
        reqDrain := false
1✔
563

1✔
564
        // check if any of the plugins required to drain or reboot the node
1✔
565
        for k, p := range dn.loadedPlugins {
2✔
566
                d, r := false, false
1✔
567
                if dn.nodeState.GetName() == "" {
2✔
568
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state")
1✔
569
                } else {
1✔
570
                        log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state")
×
571
                }
×
572
                d, r, err = p.OnNodeStateChange(latestState)
1✔
573
                if err != nil {
1✔
574
                        log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k)
×
575
                        return err
×
576
                }
×
577
                log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r)
1✔
578
                reqDrain = reqDrain || d
1✔
579
                reqReboot = reqReboot || r
1✔
580
        }
581

582
        // When running using systemd check if the applied configuration is the latest one
583
        // or there is a new config we need to apply
584
        // When using systemd configuration we write the file
585
        if vars.UsingSystemdMode {
1✔
586
                log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host")
×
587
                systemdConfModified, err := systemd.WriteConfFile(latestState)
×
588
                if err != nil {
×
589
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode")
×
590
                        return err
×
591
                }
×
592
                if systemdConfModified {
×
593
                        // remove existing result file to make sure that we will not use outdated result, e.g. in case if
×
594
                        // systemd service was not triggered for some reason
×
595
                        err = systemd.RemoveSriovResult()
×
596
                        if err != nil {
×
597
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode")
×
598
                                return err
×
599
                        }
×
600
                }
601
                reqDrain = reqDrain || systemdConfModified
×
602
                reqReboot = reqReboot || systemdConfModified
×
603
                log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results",
×
604
                        "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
×
605

×
606
                err = systemd.WriteSriovSupportedNics()
×
607
                if err != nil {
×
608
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode")
×
609
                        return err
×
610
                }
×
611
        }
612
        log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon",
1✔
613
                "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain)
1✔
614

1✔
615
        for k, p := range dn.loadedPlugins {
2✔
616
                // Skip both the general and virtual plugin apply them last
1✔
617
                if k != GenericPluginName && k != VirtualPluginName {
1✔
618
                        err := p.Apply()
×
619
                        if err != nil {
×
620
                                log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k)
×
621
                                return err
×
622
                        }
×
623
                }
624
        }
625
        if dn.platformHelpers.IsOpenshiftCluster() && !dn.platformHelpers.IsHypershift() {
1✔
626
                if err = dn.getNodeMachinePool(); err != nil {
×
627
                        return err
×
628
                }
×
629
        }
630
        if reqDrain {
1✔
631
                if !dn.isNodeDraining() {
×
632
                        if !dn.disableDrain {
×
633
                                ctx, cancel := context.WithCancel(context.TODO())
×
634
                                defer cancel()
×
635

×
636
                                log.Log.Info("nodeStateSyncHandler(): get drain lock for sriov daemon")
×
637
                                done := make(chan bool)
×
638
                                go dn.getDrainLock(ctx, done)
×
639
                                <-done
×
640
                        }
×
641
                }
642

643
                if dn.platformHelpers.IsOpenshiftCluster() && !dn.platformHelpers.IsHypershift() {
×
644
                        log.Log.Info("nodeStateSyncHandler(): pause MCP")
×
645
                        if err := dn.pauseMCP(); err != nil {
×
646
                                return err
×
647
                        }
×
648
                }
649

650
                if dn.disableDrain {
×
651
                        log.Log.Info("nodeStateSyncHandler(): disable drain is true skipping drain")
×
652
                } else {
×
653
                        log.Log.Info("nodeStateSyncHandler(): drain node")
×
654
                        if err := dn.drainNode(); err != nil {
×
655
                                return err
×
656
                        }
×
657
                }
658
        }
659

660
        if !reqReboot && !vars.UsingSystemdMode {
2✔
661
                // For BareMetal machines apply the generic plugin
1✔
662
                selectedPlugin, ok := dn.loadedPlugins[GenericPluginName]
1✔
663
                if ok {
2✔
664
                        // Apply generic plugin last
1✔
665
                        err = selectedPlugin.Apply()
1✔
666
                        if err != nil {
1✔
667
                                log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply")
×
668
                                return err
×
669
                        }
×
670
                }
671

672
                // For Virtual machines apply the virtual plugin
673
                selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName]
1✔
674
                if ok {
1✔
675
                        // Apply virtual plugin last
×
676
                        err = selectedPlugin.Apply()
×
677
                        if err != nil {
×
678
                                log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply")
×
679
                                return err
×
680
                        }
×
681
                }
682
        }
683

684
        if reqReboot {
1✔
685
                log.Log.Info("nodeStateSyncHandler(): reboot node")
×
686
                dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
×
687
                dn.rebootNode()
×
688
                return nil
×
689
        }
×
690

691
        // restart device plugin pod
692
        log.Log.Info("nodeStateSyncHandler(): restart device plugin pod")
1✔
693
        if err := dn.restartDevicePluginPod(); err != nil {
1✔
694
                log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod")
×
695
                return err
×
696
        }
×
697
        if dn.isNodeDraining() {
1✔
698
                if err := dn.completeDrain(); err != nil {
×
699
                        log.Log.Error(err, "nodeStateSyncHandler(): failed to complete draining")
×
700
                        return err
×
701
                }
×
702
        } else {
1✔
703
                if !dn.nodeHasAnnotation(annoKey, annoIdle) {
2✔
704
                        if err := dn.annotateNode(vars.NodeName, annoIdle); err != nil {
1✔
705
                                log.Log.Error(err, "nodeStateSyncHandler(): failed to annotate node")
×
706
                                return err
×
707
                        }
×
708
                }
709
        }
710
        log.Log.Info("nodeStateSyncHandler(): sync succeeded")
1✔
711
        dn.nodeState = latestState.DeepCopy()
1✔
712
        if vars.UsingSystemdMode {
1✔
713
                dn.refreshCh <- Message{
×
714
                        syncStatus:    sriovResult.SyncStatus,
×
715
                        lastSyncError: sriovResult.LastSyncError,
×
716
                }
×
717
        } else {
1✔
718
                dn.refreshCh <- Message{
1✔
719
                        syncStatus:    consts.SyncStatusSucceeded,
1✔
720
                        lastSyncError: "",
1✔
721
                }
1✔
722
        }
1✔
723
        // wait for writer to refresh the status
724
        <-dn.syncCh
1✔
725
        return nil
1✔
726
}
727

728
func (dn *Daemon) nodeHasAnnotation(annoKey string, value string) bool {
1✔
729
        // Check if node already contains annotation
1✔
730
        if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == value) {
1✔
731
                return true
×
732
        }
×
733
        return false
1✔
734
}
735

736
// isNodeDraining: check if the node is draining
737
// both Draining and MCP paused labels will return true
738
func (dn *Daemon) isNodeDraining() bool {
1✔
739
        anno, ok := dn.node.Annotations[annoKey]
1✔
740
        if !ok {
2✔
741
                return false
1✔
742
        }
1✔
743

744
        return anno == annoDraining || anno == annoMcpPaused
×
745
}
746

747
func (dn *Daemon) completeDrain() error {
×
748
        if !dn.disableDrain {
×
749
                if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
×
750
                        return err
×
751
                }
×
752
        }
753

754
        if dn.platformHelpers.IsOpenshiftCluster() && !dn.platformHelpers.IsHypershift() {
×
755
                log.Log.Info("completeDrain(): resume MCP", "mcp-name", dn.mcpName)
×
756
                pausePatch := []byte("{\"spec\":{\"paused\":false}}")
×
757
                if _, err := dn.platformHelpers.GetMcClient().MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil {
×
758
                        log.Log.Error(err, "completeDrain(): failed to resume MCP", "mcp-name", dn.mcpName)
×
759
                        return err
×
760
                }
×
761
        }
762

763
        if err := dn.annotateNode(vars.NodeName, annoIdle); err != nil {
×
764
                log.Log.Error(err, "completeDrain(): failed to annotate node")
×
765
                return err
×
766
        }
×
767
        return nil
×
768
}
769

770
func (dn *Daemon) restartDevicePluginPod() error {
1✔
771
        dn.mu.Lock()
1✔
772
        defer dn.mu.Unlock()
1✔
773
        log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod")
1✔
774

1✔
775
        var podToDelete string
1✔
776
        pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{
1✔
777
                LabelSelector:   "app=sriov-device-plugin",
1✔
778
                FieldSelector:   "spec.nodeName=" + vars.NodeName,
1✔
779
                ResourceVersion: "0",
1✔
780
        })
1✔
781
        if err != nil {
1✔
782
                if errors.IsNotFound(err) {
×
783
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
784
                        return nil
×
785
                }
×
786
                log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying")
×
787
                return err
×
788
        }
789

790
        if len(pods.Items) == 0 {
1✔
791
                log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
×
792
                return nil
×
793
        }
×
794
        podToDelete = pods.Items[0].Name
1✔
795

1✔
796
        log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete)
1✔
797
        err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{})
1✔
798
        if errors.IsNotFound(err) {
1✔
799
                log.Log.Info("restartDevicePluginPod(): pod to delete not found")
×
800
                return nil
×
801
        }
×
802
        if err != nil {
1✔
803
                log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying")
×
804
                return err
×
805
        }
×
806

807
        if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) {
2✔
808
                _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{})
1✔
809
                if errors.IsNotFound(err) {
2✔
810
                        log.Log.Info("restartDevicePluginPod(): device plugin pod exited")
1✔
811
                        return true, nil
1✔
812
                }
1✔
813

814
                if err != nil {
×
815
                        log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying")
×
816
                } else {
×
817
                        log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete)
×
818
                }
×
819
                return false, nil
×
820
        }, dn.stopCh); err != nil {
×
821
                log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion")
×
822
                return err
×
823
        }
×
824

825
        return nil
1✔
826
}
827

828
func (dn *Daemon) rebootNode() {
×
829
        log.Log.Info("rebootNode(): trigger node reboot")
×
830
        exit, err := dn.HostHelpers.Chroot(consts.Host)
×
831
        if err != nil {
×
832
                log.Log.Error(err, "rebootNode(): chroot command failed")
×
833
        }
×
834
        defer exit()
×
835
        // creates a new transient systemd unit to reboot the system.
×
836
        // We explictily try to stop kubelet.service first, before anything else; this
×
837
        // way we ensure the rest of system stays running, because kubelet may need
×
838
        // to do "graceful" shutdown by e.g. de-registering with a load balancer.
×
839
        // However note we use `;` instead of `&&` so we keep rebooting even
×
840
        // if kubelet failed to shutdown - that way the machine will still eventually reboot
×
841
        // as systemd will time out the stop invocation.
×
842
        cmd := exec.Command("systemd-run", "--unit", "sriov-network-config-daemon-reboot",
×
843
                "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot")
×
844

×
845
        if err := cmd.Run(); err != nil {
×
846
                log.Log.Error(err, "failed to reboot node")
×
847
        }
×
848
}
849

850
func (dn *Daemon) annotateNode(node, value string) error {
1✔
851
        log.Log.Info("annotateNode(): Annotate node", "name", node, "value", value)
1✔
852

1✔
853
        oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(context.Background(), vars.NodeName, metav1.GetOptions{})
1✔
854
        if err != nil {
1✔
855
                log.Log.Error(err, "annotateNode(): Failed to get node, retrying", "name", node)
×
856
                return err
×
857
        }
×
858

859
        oldData, err := json.Marshal(oldNode)
1✔
860
        if err != nil {
1✔
861
                return err
×
862
        }
×
863

864
        newNode := oldNode.DeepCopy()
1✔
865
        if newNode.Annotations == nil {
2✔
866
                newNode.Annotations = map[string]string{}
1✔
867
        }
1✔
868
        if newNode.Annotations[annoKey] != value {
2✔
869
                newNode.Annotations[annoKey] = value
1✔
870
                newData, err := json.Marshal(newNode)
1✔
871
                if err != nil {
1✔
872
                        return err
×
873
                }
×
874
                patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
1✔
875
                if err != nil {
1✔
876
                        return err
×
877
                }
×
878
                _, err = dn.kubeClient.CoreV1().Nodes().Patch(context.Background(),
1✔
879
                        vars.NodeName,
1✔
880
                        types.StrategicMergePatchType,
1✔
881
                        patchBytes,
1✔
882
                        metav1.PatchOptions{})
1✔
883
                if err != nil {
1✔
884
                        log.Log.Error(err, "annotateNode(): Failed to patch node", "name", node)
×
885
                        return err
×
886
                }
×
887
        }
888
        return nil
1✔
889
}
890

891
func (dn *Daemon) getNodeMachinePool() error {
×
892
        desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey]
×
893
        if !ok {
×
894
                log.Log.Error(nil, "getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
×
895
                return fmt.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
×
896
        }
×
897
        mc, err := dn.platformHelpers.GetMcClient().MachineconfigurationV1().MachineConfigs().Get(context.TODO(), desiredConfig, metav1.GetOptions{})
×
898
        if err != nil {
×
899
                log.Log.Error(err, "getNodeMachinePool(): Failed to get the desired Machine Config")
×
900
                return err
×
901
        }
×
902
        for _, owner := range mc.OwnerReferences {
×
903
                if owner.Kind == "MachineConfigPool" {
×
904
                        dn.mcpName = owner.Name
×
905
                        return nil
×
906
                }
×
907
        }
908

909
        log.Log.Error(nil, "getNodeMachinePool(): Failed to find the MCP of the node")
×
910
        return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node")
×
911
}
912

913
func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
×
914
        var err error
×
915

×
916
        lock := &resourcelock.LeaseLock{
×
917
                LeaseMeta: metav1.ObjectMeta{
×
918
                        Name:      "config-daemon-draining-lock",
×
919
                        Namespace: vars.Namespace,
×
920
                },
×
921
                Client: dn.kubeClient.CoordinationV1(),
×
922
                LockConfig: resourcelock.ResourceLockConfig{
×
923
                        Identity: vars.NodeName,
×
924
                },
×
925
        }
×
926

×
927
        // start the leader election
×
928
        leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
×
929
                Lock:            lock,
×
930
                ReleaseOnCancel: true,
×
931
                LeaseDuration:   5 * time.Second,
×
932
                RenewDeadline:   3 * time.Second,
×
933
                RetryPeriod:     1 * time.Second,
×
934
                Callbacks: leaderelection.LeaderCallbacks{
×
935
                        OnStartedLeading: func(ctx context.Context) {
×
936
                                log.Log.V(2).Info("getDrainLock(): started leading")
×
NEW
937
                                stopAllNodesWatchChan := make(chan struct{})
×
NEW
938
                                err = dn.startAllNodesInformer(stopAllNodesWatchChan)
×
NEW
939
                                if err != nil {
×
NEW
940
                                        log.Log.Error(err, "getDrainLock(): Failed to start node informer")
×
NEW
941
                                        // The context was canceled, stopChannel closed. There is no need to block here
×
NEW
942
                                        done <- true
×
NEW
943
                                        return
×
NEW
944
                                }
×
NEW
945
                                log.Log.V(2).Info("getDrainLock(): started node informer")
×
946
                                for {
×
947
                                        time.Sleep(3 * time.Second)
×
948
                                        if dn.node.Annotations[annoKey] == annoMcpPaused {
×
949
                                                // The node in Draining_MCP_Paused state, no other node is draining. Skip drainable checking
×
950
                                                done <- true
×
951
                                                return
×
952
                                        }
×
953
                                        if dn.drainable {
×
NEW
954
                                                // Stop node informer
×
NEW
955
                                                close(stopAllNodesWatchChan)
×
956
                                                log.Log.V(2).Info("getDrainLock(): no other node is draining")
×
957
                                                err = dn.annotateNode(vars.NodeName, annoDraining)
×
958
                                                if err != nil {
×
959
                                                        log.Log.Error(err, "getDrainLock(): failed to annotate node")
×
960
                                                        continue
×
961
                                                }
962
                                                done <- true
×
963
                                                return
×
964
                                        }
965
                                        log.Log.V(2).Info("getDrainLock(): other node is draining, wait...")
×
966
                                }
967
                        },
968
                        OnStoppedLeading: func() {
×
969
                                log.Log.V(2).Info("getDrainLock(): stopped leading")
×
970
                        },
×
971
                },
972
        })
973
}
974

975
func (dn *Daemon) pauseMCP() error {
×
976
        log.Log.Info("pauseMCP(): pausing MCP")
×
977
        var err error
×
978

×
979
        mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.platformHelpers.GetMcClient(),
×
980
                time.Second*30,
×
981
        )
×
982
        mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()
×
983

×
984
        ctx, cancel := context.WithCancel(context.TODO())
×
985
        defer cancel()
×
986
        paused := dn.node.Annotations[annoKey] == annoMcpPaused
×
987

×
988
        mcpEventHandler := func(obj interface{}) {
×
989
                mcp := obj.(*mcfgv1.MachineConfigPool)
×
990
                if mcp.GetName() != dn.mcpName {
×
991
                        return
×
992
                }
×
993
                // Always get the latest object
994
                newMcp, err := dn.platformHelpers.GetMcClient().MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
×
995
                if err != nil {
×
996
                        log.Log.V(2).Error(err, "pauseMCP(): Failed to get MCP", "mcp-name", dn.mcpName)
×
997
                        return
×
998
                }
×
999
                if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
×
1000
                        mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
×
1001
                        mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
×
1002
                        log.Log.V(2).Info("pauseMCP(): MCP is ready", "mcp-name", dn.mcpName)
×
1003
                        if paused {
×
1004
                                log.Log.V(2).Info("pauseMCP(): stop MCP informer")
×
1005
                                cancel()
×
1006
                                return
×
1007
                        }
×
1008
                        if newMcp.Spec.Paused {
×
1009
                                log.Log.V(2).Info("pauseMCP(): MCP was paused by other, wait...", "mcp-name", dn.mcpName)
×
1010
                                return
×
1011
                        }
×
1012
                        log.Log.Info("pauseMCP(): pause MCP", "mcp-name", dn.mcpName)
×
1013
                        pausePatch := []byte("{\"spec\":{\"paused\":true}}")
×
1014
                        _, err = dn.platformHelpers.GetMcClient().MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
×
1015
                        if err != nil {
×
1016
                                log.Log.V(2).Error(err, "pauseMCP(): failed to pause MCP", "mcp-name", dn.mcpName)
×
1017
                                return
×
1018
                        }
×
1019
                        err = dn.annotateNode(vars.NodeName, annoMcpPaused)
×
1020
                        if err != nil {
×
1021
                                log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node")
×
1022
                                return
×
1023
                        }
×
1024
                        paused = true
×
1025
                        return
×
1026
                }
1027
                if paused {
×
1028
                        log.Log.Info("pauseMCP(): MCP is processing, resume MCP", "mcp-name", dn.mcpName)
×
1029
                        pausePatch := []byte("{\"spec\":{\"paused\":false}}")
×
1030
                        _, err = dn.platformHelpers.GetMcClient().MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
×
1031
                        if err != nil {
×
1032
                                log.Log.V(2).Error(err, "pauseMCP(): fail to resume MCP", "mcp-name", dn.mcpName)
×
1033
                                return
×
1034
                        }
×
1035
                        err = dn.annotateNode(vars.NodeName, annoDraining)
×
1036
                        if err != nil {
×
1037
                                log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node")
×
1038
                                return
×
1039
                        }
×
1040
                        paused = false
×
1041
                }
1042
                log.Log.Info("pauseMCP():MCP is not ready, wait...",
×
1043
                        "mcp-name", newMcp.GetName(), "mcp-conditions", newMcp.Status.Conditions)
×
1044
        }
1045

1046
        mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1047
                AddFunc: mcpEventHandler,
×
1048
                UpdateFunc: func(old, new interface{}) {
×
1049
                        mcpEventHandler(new)
×
1050
                },
×
1051
        })
1052

1053
        // The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
1054
        // Only check MCP state if the node is not in Draining_MCP_Paused state
1055
        if !paused {
×
1056
                mcpInformerFactory.Start(ctx.Done())
×
1057
                mcpInformerFactory.WaitForCacheSync(ctx.Done())
×
1058
                <-ctx.Done()
×
1059
        }
×
1060

1061
        return err
×
1062
}
1063

1064
func (dn *Daemon) drainNode() error {
×
1065
        log.Log.Info("drainNode(): Update prepared")
×
1066
        var err error
×
1067

×
1068
        backoff := wait.Backoff{
×
1069
                Steps:    5,
×
1070
                Duration: 10 * time.Second,
×
1071
                Factor:   2,
×
1072
        }
×
1073
        var lastErr error
×
1074

×
1075
        log.Log.Info("drainNode(): Start draining")
×
1076
        dn.eventRecorder.SendEvent("DrainNode", "Drain node has been initiated")
×
1077
        if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
×
1078
                err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true)
×
1079
                if err != nil {
×
1080
                        lastErr = err
×
1081
                        log.Log.Error(err, "cordon failed, retrying")
×
1082
                        return false, nil
×
1083
                }
×
1084
                err = drain.RunNodeDrain(dn.drainer, vars.NodeName)
×
1085
                if err == nil {
×
1086
                        return true, nil
×
1087
                }
×
1088
                lastErr = err
×
1089
                log.Log.Error(err, "Draining failed, retrying")
×
1090
                return false, nil
×
1091
        }); err != nil {
×
1092
                if err == wait.ErrWaitTimeout {
×
1093
                        log.Log.Error(err, "drainNode(): failed to drain node", "tries", backoff.Steps, "last-error", lastErr)
×
1094
                }
×
1095
                dn.eventRecorder.SendEvent("DrainNode", "Drain node failed")
×
1096
                log.Log.Error(err, "drainNode(): failed to drain node")
×
1097
                return err
×
1098
        }
1099
        dn.eventRecorder.SendEvent("DrainNode", "Drain node completed")
×
1100
        log.Log.Info("drainNode(): drain complete")
×
1101
        return nil
×
1102
}
1103

1104
// TODO: move this to host interface
1105
func (dn *Daemon) tryCreateSwitchdevUdevRule() error {
1✔
1106
        log.Log.V(2).Info("tryCreateSwitchdevUdevRule()")
1✔
1107
        nodeState, nodeStateErr := dn.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(
1✔
1108
                context.Background(),
1✔
1109
                vars.NodeName,
1✔
1110
                metav1.GetOptions{},
1✔
1111
        )
1✔
1112
        if nodeStateErr != nil {
1✔
1113
                log.Log.Error(nodeStateErr, "could not fetch node state, skip updating switchdev udev rules", "name", vars.NodeName)
×
1114
                return nil
×
1115
        }
×
1116

1117
        var newContent string
1✔
1118
        filePath := path.Join(vars.FilesystemRoot, "/host/etc/udev/rules.d/20-switchdev.rules")
1✔
1119

1✔
1120
        for _, ifaceStatus := range nodeState.Status.Interfaces {
2✔
1121
                if ifaceStatus.EswitchMode == sriovnetworkv1.ESwithModeSwitchDev {
1✔
1122
                        switchID, err := dn.HostHelpers.GetPhysSwitchID(ifaceStatus.Name)
×
1123
                        if err != nil {
×
1124
                                return err
×
1125
                        }
×
1126
                        portName, err := dn.HostHelpers.GetPhysPortName(ifaceStatus.Name)
×
1127
                        if err != nil {
×
1128
                                return err
×
1129
                        }
×
1130
                        newContent = newContent + fmt.Sprintf("SUBSYSTEM==\"net\", ACTION==\"add|move\", ATTRS{phys_switch_id}==\"%s\", ATTR{phys_port_name}==\"pf%svf*\", IMPORT{program}=\"/etc/udev/switchdev-vf-link-name.sh $attr{phys_port_name}\", NAME=\"%s_$env{NUMBER}\"\n", switchID, strings.TrimPrefix(portName, "p"), ifaceStatus.Name)
×
1131
                }
1132
        }
1133

1134
        oldContent, err := os.ReadFile(filePath)
1✔
1135
        // if oldContent = newContent, don't do anything
1✔
1136
        if err == nil && newContent == string(oldContent) {
1✔
1137
                return nil
×
1138
        }
×
1139

1140
        log.Log.V(2).Info("Old udev content and new content differ. Writing new content to file.",
1✔
1141
                "old-content", strings.TrimSuffix(string(oldContent), "\n"),
1✔
1142
                "new-content", strings.TrimSuffix(newContent, "\n"),
1✔
1143
                "path", filePath)
1✔
1144

1✔
1145
        // if the file does not exist or if oldContent != newContent
1✔
1146
        // write to file and create it if it doesn't exist
1✔
1147
        err = os.WriteFile(filePath, []byte(newContent), 0664)
1✔
1148
        if err != nil {
1✔
1149
                log.Log.Error(err, "tryCreateSwitchdevUdevRule(): fail to write file")
×
1150
                return err
×
1151
        }
×
1152

1153
        var stdout, stderr bytes.Buffer
1✔
1154
        cmd := exec.Command("/bin/bash", path.Join(vars.FilesystemRoot, udevScriptsPath))
1✔
1155
        cmd.Stdout = &stdout
1✔
1156
        cmd.Stderr = &stderr
1✔
1157
        if err := cmd.Run(); err != nil {
2✔
1158
                return err
1✔
1159
        }
1✔
1160
        log.Log.V(2).Info("tryCreateSwitchdevUdevRule(): stdout", "output", cmd.Stdout)
×
1161

×
1162
        i, err := strconv.Atoi(strings.TrimSpace(stdout.String()))
×
1163
        if err == nil {
×
1164
                if i == 0 {
×
1165
                        log.Log.V(2).Info("tryCreateSwitchdevUdevRule(): switchdev udev rules loaded")
×
1166
                } else {
×
1167
                        log.Log.V(2).Info("tryCreateSwitchdevUdevRule(): switchdev udev rules not loaded")
×
1168
                }
×
1169
        }
1170
        return nil
×
1171
}
1172

1173
func (dn *Daemon) prepareNMUdevRule() error {
1✔
1174
        // we need to remove the Red Hat Virtio network device from the udev rule configuration
1✔
1175
        // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot
1✔
1176
        // even that the operator should not be installed on virtual environments that are not openstack
1✔
1177
        // we should not destroy the cluster if the operator is installed there
1✔
1178
        supportedVfIds := []string{}
1✔
1179
        for _, vfID := range sriovnetworkv1.GetSupportedVfIds() {
2✔
1180
                if vfID == "0x1000" || vfID == "0x1041" {
1✔
1181
                        continue
×
1182
                }
1183
                supportedVfIds = append(supportedVfIds, vfID)
1✔
1184
        }
1185

1186
        return dn.HostHelpers.PrepareNMUdevRule(supportedVfIds)
1✔
1187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc