• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 26391629904

25 May 2026 08:39AM UTC coverage: 25.295%. First build
26391629904

Pull #6783

github

oilbeater
refactor(daemon): drop redundant Node DeepCopy in provider network sync

handleAddOrUpdateProviderNetwork passed node.DeepCopy() to cleanProviderNetwork
and initProviderNetwork, but both callees only read node.Name. Hand the shared
informer reference through instead, and document on each signature that the
node must not be mutated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Pull Request #6783: refactor(daemon): drop redundant Node DeepCopy in provider network sync

0 of 2 new or added lines in 1 file covered. (0.0%)

14538 of 57473 relevant lines covered (25.3%)

0.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.22
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        "k8s.io/apimachinery/pkg/selection"
21
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
22
        "k8s.io/apimachinery/pkg/util/wait"
23
        "k8s.io/client-go/informers"
24
        "k8s.io/client-go/kubernetes/scheme"
25
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
26
        listerv1 "k8s.io/client-go/listers/core/v1"
27
        netv1lister "k8s.io/client-go/listers/networking/v1"
28
        "k8s.io/client-go/tools/cache"
29
        "k8s.io/client-go/tools/record"
30
        "k8s.io/client-go/util/workqueue"
31
        "k8s.io/klog/v2"
32
        k8sexec "k8s.io/utils/exec"
33
        kubevirtv1 "kubevirt.io/api/core/v1"
34

35
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
36
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
37
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
38
        "github.com/kubeovn/kube-ovn/pkg/ovs"
39
        "github.com/kubeovn/kube-ovn/pkg/util"
40
)
41

42
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
43
type Controller struct {
44
        config *Configuration
45

46
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
47
        providerNetworksSynced          cache.InformerSynced
48
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
49
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
50

51
        vlansLister kubeovnlister.VlanLister
52
        vlansSynced cache.InformerSynced
53

54
        subnetsLister kubeovnlister.SubnetLister
55
        subnetsSynced cache.InformerSynced
56
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
57

58
        ovnEipsLister kubeovnlister.OvnEipLister
59
        ovnEipsSynced cache.InformerSynced
60

61
        podsLister     listerv1.PodLister
62
        podsSynced     cache.InformerSynced
63
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
64

65
        nodesLister     listerv1.NodeLister
66
        nodesSynced     cache.InformerSynced
67
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
68

69
        servicesLister listerv1.ServiceLister
70
        servicesSynced cache.InformerSynced
71
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
72

73
        caSecretLister listerv1.SecretLister
74
        caSecretSynced cache.InformerSynced
75
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
76

77
        serviceCIDRStore           *util.ServiceCIDRStore
78
        serviceCIDRLister          netv1lister.ServiceCIDRLister
79
        serviceCIDRSynced          cache.InformerSynced
80
        serviceCIDRInformerFactory informers.SharedInformerFactory
81

82
        recorder record.EventRecorder
83

84
        protocol string
85

86
        ControllerRuntime
87

88
        k8sExec k8sexec.Interface
89

90
        ipsecServiceStarted sync.Once
91

92
        // channel used for fdb sync
93
        fdbSyncChan   chan struct{}
94
        fdbSyncMutex  sync.Mutex
95
        vswitchClient ovs.Vswitch
96
}
97

98
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
99
        if rateLimiter == nil {
×
100
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
101
        }
×
102
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
103
}
104

105
// NewController init a daemon controller
106
func NewController(config *Configuration,
107
        stopCh <-chan struct{},
108
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
109
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
110
) (*Controller, error) {
×
111
        eventBroadcaster := record.NewBroadcaster()
×
112
        eventBroadcaster.StartLogging(klog.Infof)
×
113
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
114
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
115
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
116
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
117
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
118
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
119
        podInformer := podInformerFactory.Core().V1().Pods()
×
120
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
121
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
122
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
123

×
124
        controller := &Controller{
×
125
                config: config,
×
126

×
127
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
128
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
129
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
130
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
131

×
132
                vlansLister: vlanInformer.Lister(),
×
133
                vlansSynced: vlanInformer.Informer().HasSynced,
×
134

×
135
                subnetsLister: subnetInformer.Lister(),
×
136
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
137
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
138

×
139
                ovnEipsLister: ovnEipInformer.Lister(),
×
140
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
141

×
142
                podsLister:     podInformer.Lister(),
×
143
                podsSynced:     podInformer.Informer().HasSynced,
×
144
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
145

×
146
                nodesLister:     nodeInformer.Lister(),
×
147
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
148
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
149

×
150
                servicesLister: servicesInformer.Lister(),
×
151
                servicesSynced: servicesInformer.Informer().HasSynced,
×
152
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
153

×
154
                caSecretLister: caSecretInformer.Lister(),
×
155
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
156
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
157

×
158
                serviceCIDRStore: util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
159
                serviceCIDRInformerFactory: informers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
160
                        informers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
161
                                listOption.AllowWatchBookmarks = true
×
162
                        }),
×
163
                ),
164

165
                recorder: recorder,
166
                k8sExec:  k8sexec.New(),
167

168
                fdbSyncChan: make(chan struct{}, 1),
169
        }
170

171
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
172
        if err != nil {
×
173
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
174
        }
×
175
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
176

×
177
        if err = controller.initRuntime(); err != nil {
×
178
                return nil, err
×
179
        }
×
180

181
        podInformerFactory.Start(stopCh)
×
182
        nodeInformerFactory.Start(stopCh)
×
183
        kubeovnInformerFactory.Start(stopCh)
×
184
        caSecretInformerFactory.Start(stopCh)
×
185
        controller.StartServiceCIDRInformerFactory(stopCh)
×
186

×
187
        if !cache.WaitForCacheSync(stopCh,
×
188
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
189
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
190
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
191
        }
×
192

193
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
194
                AddFunc:    controller.enqueueAddProviderNetwork,
×
195
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
196
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
197
        }); err != nil {
×
198
                return nil, err
×
199
        }
×
200
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
201
                UpdateFunc: controller.enqueueUpdateVlan,
×
202
        }); err != nil {
×
203
                return nil, err
×
204
        }
×
205
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
206
                AddFunc:    controller.enqueueAddSubnet,
×
207
                UpdateFunc: controller.enqueueUpdateSubnet,
×
208
                DeleteFunc: controller.enqueueDeleteSubnet,
×
209
        }); err != nil {
×
210
                return nil, err
×
211
        }
×
212
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
213
                AddFunc:    controller.enqueueAddService,
×
214
                DeleteFunc: controller.enqueueDeleteService,
×
215
                UpdateFunc: controller.enqueueUpdateService,
×
216
        }); err != nil {
×
217
                util.LogFatalAndExit(err, "failed to add service event handler")
×
218
        }
×
219

220
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
221
                UpdateFunc: controller.enqueueUpdatePod,
×
222
        }); err != nil {
×
223
                return nil, err
×
224
        }
×
225
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
226
                AddFunc:    controller.enqueueAddIPSecCA,
×
227
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
228
        }); err != nil {
×
229
                return nil, err
×
230
        }
×
231
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
232
                UpdateFunc: controller.enqueueUpdateNode,
×
233
        }); err != nil {
×
234
                return nil, err
×
235
        }
×
236

237
        if controller.vswitchClient, err = ovs.NewVswitchClient("unix:/var/run/openvswitch/db.sock", 1, 3); err != nil {
×
238
                return nil, fmt.Errorf("failed to create vswitch client: %w", err)
×
239
        }
×
240

241
        return controller, nil
×
242
}
243

244
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
245
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
246
        klog.V(3).Infof("enqueue add CA %s", key)
×
247
        c.ipsecQueue.Add(key)
×
248
}
×
249

250
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
251
        oldSecret := oldObj.(*v1.Secret)
×
252
        newSecret := newObj.(*v1.Secret)
×
253
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
254
                // No changes in CA data, no need to enqueue
×
255
                return
×
256
        }
×
257

258
        key := cache.MetaObjectToName(newSecret).String()
×
259
        klog.V(3).Infof("enqueue update CA %s", key)
×
260
        c.ipsecQueue.Add(key)
×
261
}
262

263
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
264
        oldNode := oldObj.(*v1.Node)
×
265
        newNode := newObj.(*v1.Node)
×
266
        if newNode.Name != c.config.NodeName {
×
267
                return
×
268
        }
×
269
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
270
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
271
                c.updateNodeQueue.Add(newNode.Name)
×
272
        }
×
273
}
274

275
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
276
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
277
        klog.V(3).Infof("enqueue add provider network %s", key)
×
278
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
279
}
×
280

281
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
282
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
283
        klog.V(3).Infof("enqueue update provider network %s", key)
×
284
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
285
}
×
286

287
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
288
        var pn *kubeovnv1.ProviderNetwork
×
289
        switch t := obj.(type) {
×
290
        case *kubeovnv1.ProviderNetwork:
×
291
                pn = t
×
292
        case cache.DeletedFinalStateUnknown:
×
293
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
294
                if !ok {
×
295
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
296
                        return
×
297
                }
×
298
                pn = p
×
299
        default:
×
300
                klog.Warningf("unexpected type: %T", obj)
×
301
                return
×
302
        }
303

304
        key := cache.MetaObjectToName(pn).String()
×
305
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
306
        c.deleteProviderNetworkQueue.Add(pn)
×
307
}
308

309
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
310
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
311
        }
×
312
}
313

314
func (c *Controller) runDeleteProviderNetworkWorker() {
×
315
        for c.processNextDeleteProviderNetworkWorkItem() {
×
316
        }
×
317
}
318

319
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
320
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
321
        if shutdown {
×
322
                return false
×
323
        }
×
324

325
        err := func(key string) error {
×
326
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
327
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
328
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
329
                }
×
330
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
331
                return nil
×
332
        }(key)
333
        if err != nil {
×
334
                utilruntime.HandleError(err)
×
335
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
336
                return true
×
337
        }
×
338
        return true
×
339
}
340

341
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
342
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
343
        if shutdown {
×
344
                return false
×
345
        }
×
346

347
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
348
                defer c.deleteProviderNetworkQueue.Done(obj)
×
349
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
350
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
351
                }
×
352
                c.deleteProviderNetworkQueue.Forget(obj)
×
353
                return nil
×
354
        }(obj)
355
        if err != nil {
×
356
                utilruntime.HandleError(err)
×
357
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
358
                return true
×
359
        }
×
360
        return true
×
361
}
362

363
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
364
        klog.V(3).Infof("handle update provider network %s", key)
×
365
        node, err := c.nodesLister.Get(c.config.NodeName)
×
366
        if err != nil {
×
367
                klog.Error(err)
×
368
                return err
×
369
        }
×
370
        pn, err := c.providerNetworksLister.Get(key)
×
371
        if err != nil {
×
372
                if k8serrors.IsNotFound(err) {
×
373
                        return nil
×
374
                }
×
375
                klog.Error(err)
×
376
                return err
×
377
        }
378

379
        // Skip initialization if the provider network is being deleted.
380
        // Without this check, a requeue from a previous error could trigger re-init
381
        // during deletion, adding the NIC as a port to a dying bridge. This creates
382
        // stale OVS netdev cache entries that block exchange-link-name bridge creation.
383
        if !pn.DeletionTimestamp.IsZero() {
×
384
                klog.V(3).Infof("provider network %s is being deleted, skip init", key)
×
385
                return nil
×
386
        }
×
387

388
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
389
        if err != nil {
×
390
                klog.Error(err)
×
391
                return err
×
392
        }
×
393

394
        if excluded {
×
395
                c.recordProviderNetworkErr(pn.Name, "")
×
NEW
396
                return c.cleanProviderNetwork(pn.DeepCopy(), node)
×
397
        }
×
NEW
398
        return c.initProviderNetwork(pn.DeepCopy(), node)
×
399
}
400

401
func providerNetworkNic(pn *kubeovnv1.ProviderNetwork, nodeName string) string {
1✔
402
        for _, item := range pn.Spec.CustomInterfaces {
1✔
403
                if slices.Contains(item.Nodes, nodeName) {
×
404
                        return item.Interface
×
405
                }
×
406
        }
407
        return pn.Spec.DefaultInterface
1✔
408
}
409

410
// initProviderNetwork configures the provider network on the local node.
411
// node must not be mutated; it is backed by the shared informer cache.
412
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
413
        nic := providerNetworkNic(pn, node.Name)
×
414

×
415
        patch := util.KVPatch{
×
416
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
417
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
418
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
419
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
420
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
421
        }
×
422

×
423
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
424
        for _, vlanName := range pn.Status.Vlans {
×
425
                vlan, err := c.vlansLister.Get(vlanName)
×
426
                if err != nil {
×
427
                        if k8serrors.IsNotFound(err) {
×
428
                                klog.Infof("vlan %s not found", vlanName)
×
429
                                continue
×
430
                        }
431
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
432
                        return err
×
433
                }
434
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
435
        }
436
        // always add trunk 0 so that the ovs bridge can communicate with the external network
437
        vlans.Add("0")
×
438

×
439
        // Auto-create VLAN subinterface if enabled and nic contains VLAN ID
×
440
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
441
                parts := strings.SplitN(nic, ".", 2)
×
442
                parentIf := parts[0]
×
443
                if !util.CheckInterfaceExists(nic) {
×
444
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
445
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
446
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
447
                                return err
×
448
                        }
×
449
                } else {
×
450
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
451
                }
×
452
        }
453

454
        // VLAN sub-interface handling - use map for efficiency
455
        vlanInterfaceMap := make(map[string]int) // interfaceName -> vlanID
×
456

×
457
        // Process explicitly specified VLAN interfaces
×
458
        if len(pn.Spec.VlanInterfaces) > 0 {
×
459
                klog.Infof("Processing %d explicitly specified VLAN interfaces", len(pn.Spec.VlanInterfaces))
×
460
                for _, vlanIfName := range pn.Spec.VlanInterfaces {
×
461
                        if util.CheckInterfaceExists(vlanIfName) {
×
462
                                // Extract VLAN ID from interface name (e.g., "eth0.10" -> 10)
×
463
                                vlanID, err := util.ExtractVlanIDFromInterface(vlanIfName)
×
464
                                if err != nil {
×
465
                                        klog.Warningf("Failed to extract VLAN ID from interface %s: %v", vlanIfName, err)
×
466
                                        continue
×
467
                                }
468
                                vlanInterfaceMap[vlanIfName] = vlanID
×
469
                                vlans.Add(strconv.Itoa(vlanID))
×
470
                                klog.V(3).Infof("Added explicit VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
471
                        } else {
×
472
                                klog.Warningf("Explicitly specified VLAN interface %s does not exist, skipping", vlanIfName)
×
473
                        }
×
474
                }
475
        }
476

477
        // Auto-detection of additional VLAN interfaces (if enabled)
478
        if pn.Spec.PreserveVlanInterfaces {
×
479
                klog.Infof("Auto-detecting VLAN interfaces on %s", nic)
×
480
                vlanIDs := util.DetectVlanInterfaces(nic)
×
481
                for _, vlanID := range vlanIDs {
×
482
                        vlanIfName := fmt.Sprintf("%s.%d", nic, vlanID)
×
483
                        // Only add if not already explicitly specified
×
484
                        if _, exists := vlanInterfaceMap[vlanIfName]; !exists {
×
485
                                vlanInterfaceMap[vlanIfName] = vlanID
×
486
                                vlans.Add(strconv.Itoa(vlanID))
×
487
                                klog.V(3).Infof("Auto-detected VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
488
                        } else {
×
489
                                klog.V(3).Infof("VLAN interface %s already explicitly specified, skipping auto-detection", vlanIfName)
×
490
                        }
×
491
                }
492
                klog.Infof("Auto-detected %d additional VLAN interfaces for %s", len(vlanIDs), nic)
×
493
        }
494

495
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, nic, vlanInterfaceMap); err != nil {
×
496
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
497
                return err
×
498
        }
×
499

500
        var mtu int
×
501
        var err error
×
502
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
503
        // Configure main interface with ALL VLANs (including detected ones) in trunk
×
504
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback, vlanInterfaceMap); err != nil {
×
505
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
506
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
507
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
508
                }
×
509
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
510
                return err
×
511
        }
512

513
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
514
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
515
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
516
        if len(vlanInterfaceMap) > 0 {
×
517
                patch[fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name)] = "true"
×
518
        }
×
519
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
520
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
521
                return err
×
522
        }
×
523
        c.recordProviderNetworkErr(pn.Name, "")
×
524
        return nil
×
525
}
526

527
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
528
        pod, err := c.podsLister.Pods(c.config.PodNamespace).Get(c.config.PodName)
×
529
        if err != nil {
×
530
                klog.Errorf("failed to get pod %s/%s, %v", c.config.PodNamespace, c.config.PodName, err)
×
531
                return
×
532
        }
×
533

534
        patch := util.KVPatch{}
×
535
        if pod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
536
                if errMsg == "" {
×
537
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
538
                } else {
×
539
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
540
                }
×
541
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
542
                        klog.Errorf("failed to patch pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
543
                        return
×
544
                }
×
545
        }
546
}
547

548
// cleanProviderNetwork tears down the provider network from the local node.
549
// node must not be mutated; it is backed by the shared informer cache.
550
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
1✔
551
        patch := util.KVPatch{
1✔
552
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
1✔
553
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
1✔
554
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
1✔
555
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
1✔
556
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
1✔
557
        }
1✔
558
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
1✔
559
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
560
                return err
×
561
        }
×
562

563
        if err := c.ovsCleanProviderNetwork(pn.Name, providerNetworkNic(pn, node.Name)); err != nil {
2✔
564
                return err
1✔
565
        }
1✔
566

567
        return c.cleanupAutoCreatedVlanInterfaces(pn.Name, "", nil)
×
568
}
569

570
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
571
        if err := c.ovsCleanProviderNetwork(pn.Name, providerNetworkNic(pn, c.config.NodeName)); err != nil {
×
572
                klog.Error(err)
×
573
                return err
×
574
        }
×
575

576
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, "", nil); err != nil {
×
577
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
578
                return err
×
579
        }
×
580

581
        node, err := c.nodesLister.Get(c.config.NodeName)
×
582
        if err != nil {
×
583
                klog.Error(err)
×
584
                return err
×
585
        }
×
586
        if len(node.Labels) == 0 {
×
587
                return nil
×
588
        }
×
589

590
        patch := util.KVPatch{
×
591
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
592
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
593
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
594
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
595
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
596
        }
×
597
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
598
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
599
                return err
×
600
        }
×
601

602
        return nil
×
603
}
604

605
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
606
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
607
        newVlan := newObj.(*kubeovnv1.Vlan)
×
608
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
609
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
610
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
611
        }
×
612
}
613

614
type subnetEvent struct {
615
        oldObj, newObj any
616
}
617

618
type serviceEvent struct {
619
        oldObj, newObj any
620
}
621

622
func (c *Controller) enqueueAddSubnet(obj any) {
×
623
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
624
}
×
625

626
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
627
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
628
}
×
629

630
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
631
        switch t := obj.(type) {
×
632
        case *kubeovnv1.Subnet:
×
633
                c.subnetQueue.Add(&subnetEvent{oldObj: t})
×
634
        case cache.DeletedFinalStateUnknown:
×
635
                subnet, ok := t.Obj.(*kubeovnv1.Subnet)
×
636
                if !ok {
×
637
                        klog.Warningf("unexpected object type in tombstone: %T", t.Obj)
×
638
                        return
×
639
                }
×
640
                c.subnetQueue.Add(&subnetEvent{oldObj: subnet})
×
641
        default:
×
642
                klog.Warningf("unexpected type: %T", obj)
×
643
        }
644
}
645

646
func (c *Controller) runSubnetWorker() {
×
647
        for c.processNextSubnetWorkItem() {
×
648
        }
×
649
}
650

651
func (c *Controller) enqueueAddService(obj any) {
×
652
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
653
}
×
654

655
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
656
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
657
}
×
658

659
func (c *Controller) enqueueDeleteService(obj any) {
×
660
        switch t := obj.(type) {
×
661
        case *v1.Service:
×
662
                c.serviceQueue.Add(&serviceEvent{oldObj: t})
×
663
        case cache.DeletedFinalStateUnknown:
×
664
                svc, ok := t.Obj.(*v1.Service)
×
665
                if !ok {
×
666
                        klog.Warningf("unexpected object type in tombstone: %T", t.Obj)
×
667
                        return
×
668
                }
×
669
                c.serviceQueue.Add(&serviceEvent{oldObj: svc})
×
670
        default:
×
671
                klog.Warningf("unexpected type: %T", obj)
×
672
        }
673
}
674

675
func (c *Controller) runAddOrUpdateServiceWorker() {
×
676
        for c.processNextServiceWorkItem() {
×
677
        }
×
678
}
679

680
func (c *Controller) processNextSubnetWorkItem() bool {
×
681
        obj, shutdown := c.subnetQueue.Get()
×
682
        if shutdown {
×
683
                return false
×
684
        }
×
685

686
        err := func(obj *subnetEvent) error {
×
687
                defer c.subnetQueue.Done(obj)
×
688
                c.requestFdbSync()
×
689
                if err := c.reconcileRouters(obj); err != nil {
×
690
                        c.subnetQueue.AddRateLimited(obj)
×
691
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
692
                }
×
693
                c.subnetQueue.Forget(obj)
×
694
                return nil
×
695
        }(obj)
696
        if err != nil {
×
697
                utilruntime.HandleError(err)
×
698
                return true
×
699
        }
×
700
        return true
×
701
}
702

703
func (c *Controller) processNextServiceWorkItem() bool {
×
704
        obj, shutdown := c.serviceQueue.Get()
×
705
        if shutdown {
×
706
                return false
×
707
        }
×
708

709
        err := func(obj *serviceEvent) error {
×
710
                defer c.serviceQueue.Done(obj)
×
711
                if err := c.reconcileServices(obj); err != nil {
×
712
                        c.serviceQueue.AddRateLimited(obj)
×
713
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
714
                }
×
715
                c.serviceQueue.Forget(obj)
×
716
                return nil
×
717
        }(obj)
718
        if err != nil {
×
719
                utilruntime.HandleError(err)
×
720
                return true
×
721
        }
×
722
        return true
×
723
}
724

725
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
726
        oldPod := oldObj.(*v1.Pod)
×
727
        newPod := newObj.(*v1.Pod)
×
728
        key := cache.MetaObjectToName(newPod).String()
×
729

×
730
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
731
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
732
                oldPod.Annotations[util.IngressBurstAnnotation] != newPod.Annotations[util.IngressBurstAnnotation] ||
×
733
                oldPod.Annotations[util.EgressBurstAnnotation] != newPod.Annotations[util.EgressBurstAnnotation] ||
×
734
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
735
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
736
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
737
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
738
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
739
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
740
                c.updatePodQueue.Add(key)
×
741
                return
×
742
        }
×
743

744
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
745
        if err != nil {
×
746
                return
×
747
        }
×
748
        for _, multiNet := range attachNets {
×
749
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
750
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
751
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
752
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
753
                                oldPod.Annotations[fmt.Sprintf(util.IngressBurstAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressBurstAnnotationTemplate, provider)] ||
×
754
                                oldPod.Annotations[fmt.Sprintf(util.EgressBurstAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressBurstAnnotationTemplate, provider)] ||
×
755
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
756
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
757
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
758
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
759
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
760
                                c.updatePodQueue.Add(key)
×
761
                        }
×
762
                }
763
        }
764
}
765

766
func (c *Controller) runUpdatePodWorker() {
×
767
        for c.processNextUpdatePodWorkItem() {
×
768
        }
×
769
}
770

771
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
772
        key, shutdown := c.updatePodQueue.Get()
×
773
        if shutdown {
×
774
                return false
×
775
        }
×
776

777
        err := func(key string) error {
×
778
                defer c.updatePodQueue.Done(key)
×
779
                if err := c.handleUpdatePod(key); err != nil {
×
780
                        c.updatePodQueue.AddRateLimited(key)
×
781
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
782
                }
×
783
                c.updatePodQueue.Forget(key)
×
784
                return nil
×
785
        }(key)
786
        if err != nil {
×
787
                utilruntime.HandleError(err)
×
788
                return true
×
789
        }
×
790
        return true
×
791
}
792

793
// isVMLauncherPodAlive checks whether any KubeVirt launcher pod exists for the given VMI name.
794
// It tries multiple lookup strategies to handle different KubeVirt versions:
795
//  1. VirtualMachineInstanceIDLabel (vmi.kubevirt.io/id) — unique, available in KubeVirt >= 1.7
796
//  2. DeprecatedVirtualMachineNameLabel (vm.kubevirt.io/name) — may not be unique when VM hostname is set
797
//  3. DomainAnnotation (kubevirt.io/domain) — always the real VMI name, handles names > 63 chars
798
func (c *Controller) isVMLauncherPodAlive(namespace, vmiName, iface string) bool {
1✔
799
        // Try the new unique label first (KubeVirt >= 1.7)
1✔
800
        selector := labels.SelectorFromSet(map[string]string{kubevirtv1.VirtualMachineInstanceIDLabel: vmiName})
1✔
801
        launcherPods, err := c.podsLister.Pods(namespace).List(selector)
1✔
802
        if err != nil {
1✔
803
                klog.Errorf("failed to list launcher pods by %s for vmi %s/%s: %v",
×
804
                        kubevirtv1.VirtualMachineInstanceIDLabel, namespace, vmiName, err)
×
805
                return false
×
806
        }
×
807
        if len(launcherPods) > 0 {
2✔
808
                klog.V(5).Infof("found %d launcher pod(s) by %s for vmi %s/%s, keeping ovs interface %s",
1✔
809
                        len(launcherPods), kubevirtv1.VirtualMachineInstanceIDLabel, namespace, vmiName, iface)
1✔
810
                return true
1✔
811
        }
1✔
812

813
        // Fall back to the deprecated label for older KubeVirt versions
814
        selector = labels.SelectorFromSet(map[string]string{kubevirtv1.DeprecatedVirtualMachineNameLabel: vmiName})
1✔
815
        launcherPods, err = c.podsLister.Pods(namespace).List(selector)
1✔
816
        if err != nil {
1✔
817
                klog.Errorf("failed to list launcher pods by %s for vmi %s/%s: %v",
×
818
                        kubevirtv1.DeprecatedVirtualMachineNameLabel, namespace, vmiName, err)
×
819
                return false
×
820
        }
×
821
        if len(launcherPods) > 0 {
2✔
822
                klog.V(5).Infof("found %d launcher pod(s) by %s for vmi %s/%s, keeping ovs interface %s",
1✔
823
                        len(launcherPods), kubevirtv1.DeprecatedVirtualMachineNameLabel, namespace, vmiName, iface)
1✔
824
                return true
1✔
825
        }
1✔
826

827
        // Final fallback: for VMI names > 63 chars where VirtualMachineInstanceIDLabel is hashed,
828
        // match by kubevirt.io/domain annotation which always contains the real VMI name.
829
        // Use the deprecated label as a selector to narrow down to virt-launcher pods only,
830
        // then match the annotation for the exact VMI name.
831
        selector = labels.Everything()
1✔
832
        if req, err := labels.NewRequirement(kubevirtv1.DeprecatedVirtualMachineNameLabel, selection.Exists, nil); err == nil {
2✔
833
                selector = selector.Add(*req)
1✔
834
        }
1✔
835
        candidates, err := c.podsLister.Pods(namespace).List(selector)
1✔
836
        if err != nil {
1✔
837
                klog.Errorf("failed to list virt-launcher pods in namespace %s for vmi annotation lookup: %v", namespace, err)
×
838
                return false
×
839
        }
×
840
        for _, p := range candidates {
2✔
841
                if p.Annotations[kubevirtv1.DomainAnnotation] == vmiName {
2✔
842
                        klog.V(5).Infof("found launcher pod %s by %s annotation for vmi %s/%s, keeping ovs interface %s",
1✔
843
                                p.Name, kubevirtv1.DomainAnnotation, namespace, vmiName, iface)
1✔
844
                        return true
1✔
845
                }
1✔
846
        }
847

848
        return false
1✔
849
}
850

851
func (c *Controller) gcInterfaces() {
×
852
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
853
        if err != nil {
×
854
                klog.Errorf("failed to list interface pod map: %v", err)
×
855
                return
×
856
        }
×
857
        for iface, pod := range interfacePodMap {
×
858
                parts := strings.Split(pod, "/")
×
859
                if len(parts) < 3 {
×
860
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
861
                        continue
×
862
                }
863

864
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
865
                if strings.Contains(errText, "No such device") {
×
866
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
867
                        if err := ovs.CleanInterface(iface); err != nil {
×
868
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
869
                        }
×
870
                        continue
×
871
                }
872

873
                if _, err = c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
874
                        if !k8serrors.IsNotFound(err) {
×
875
                                klog.Errorf("failed to get pod %s/%s: %v", podNamespace, podName, err)
×
876
                                continue
×
877
                        }
878

879
                        // Pod not found by name. Check if this might be a KubeVirt VM.
880
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VMI name (not the launcher pod name).
881
                        // Try to find launcher pods using KubeVirt labels/annotations.
882
                        if c.isVMLauncherPodAlive(podNamespace, podName, iface) {
×
883
                                continue
×
884
                        }
885

886
                        // No pod on this node and no launcher pod found - safe to delete
887
                        klog.Infof("pod %s/%s not found on this node, delete ovs interface %s", podNamespace, podName, iface)
×
888
                        if err = ovs.CleanInterface(iface); err != nil {
×
889
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
890
                        }
×
891
                }
892
        }
893
}
894

895
func (c *Controller) runIPSecWorker() {
×
896
        for c.processNextIPSecWorkItem() {
×
897
        }
×
898
}
899

900
func (c *Controller) processNextIPSecWorkItem() bool {
×
901
        key, shutdown := c.ipsecQueue.Get()
×
902
        if shutdown {
×
903
                return false
×
904
        }
×
905
        defer c.ipsecQueue.Done(key)
×
906

×
907
        err := func(key string) error {
×
908
                if err := c.SyncIPSecKeys(key); err != nil {
×
909
                        c.ipsecQueue.AddRateLimited(key)
×
910
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
911
                }
×
912
                c.ipsecQueue.Forget(key)
×
913
                return nil
×
914
        }(key)
915
        if err != nil {
×
916
                utilruntime.HandleError(err)
×
917
                return true
×
918
        }
×
919
        return true
×
920
}
921

922
func (c *Controller) runUpdateNodeWorker() {
×
923
        for c.processNextUpdateNodeWorkItem() {
×
924
        }
×
925
}
926

927
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
928
        key, shutdown := c.updateNodeQueue.Get()
×
929
        if shutdown {
×
930
                return false
×
931
        }
×
932

933
        err := func(key string) error {
×
934
                defer c.updateNodeQueue.Done(key)
×
935
                if err := c.handleUpdateNode(key); err != nil {
×
936
                        c.updateNodeQueue.AddRateLimited(key)
×
937
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
938
                }
×
939
                c.updateNodeQueue.Forget(key)
×
940
                return nil
×
941
        }(key)
942
        if err != nil {
×
943
                utilruntime.HandleError(err)
×
944
                return true
×
945
        }
×
946
        return true
×
947
}
948

949
func (c *Controller) handleUpdateNode(key string) error {
×
950
        node, err := c.nodesLister.Get(key)
×
951
        if err != nil {
×
952
                if k8serrors.IsNotFound(err) {
×
953
                        return nil
×
954
                }
×
955
                klog.Error(err)
×
956
                return err
×
957
        }
958

959
        klog.Infof("updating node networks for node %s", key)
×
960
        return c.config.UpdateNodeNetworks(node)
×
961
}
962

963
// Run starts controller
964
func (c *Controller) Run(stopCh <-chan struct{}) {
×
965
        defer utilruntime.HandleCrash()
×
966
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
967
        defer c.deleteProviderNetworkQueue.ShutDown()
×
968
        defer c.subnetQueue.ShutDown()
×
969
        defer c.serviceQueue.ShutDown()
×
970
        defer c.updatePodQueue.ShutDown()
×
971
        defer c.ipsecQueue.ShutDown()
×
972
        defer c.updateNodeQueue.ShutDown()
×
973
        defer c.vswitchClient.Close()
×
974

×
975
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
976
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
977
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
978

×
979
        if err := c.setIPSet(); err != nil {
×
980
                util.LogFatalAndExit(err, "failed to set ipsets")
×
981
        }
×
982

983
        klog.Info("Started workers")
×
984
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
985
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
986
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
987
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
988
        go wait.Until(c.runAddOrUpdateServiceWorker, time.Second, stopCh)
×
989
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
990
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
991
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
992
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
993
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
994
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
995
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
996
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
997
        go wait.Until(func() {
×
998
                if err := c.reconcileRouters(nil); err != nil {
×
999
                        klog.Errorf("failed to reconcile %s routes: %v", util.NodeNic, err)
×
1000
                }
×
1001
        }, 3*time.Second, stopCh)
1002

1003
        if c.config.EnableTProxy {
×
1004
                go c.StartTProxyForwarding()
×
1005
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
1006
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
1007
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
1008
                // kubelet to tproxy, if probe success recover the iptable rules
×
1009
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
1010
        } else {
×
1011
                c.cleanTProxyConfig()
×
1012
        }
×
1013

1014
        if !c.config.EnableOVNIPSec {
×
1015
                if err := c.StopAndClearIPSecResource(); err != nil {
×
1016
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
1017
                }
×
1018
        }
1019

1020
        // Start OpenFlow sync loop
1021
        go c.runFlowSync(stopCh)
×
1022

×
1023
        // start fdb sync loop
×
1024
        go c.runFdbSync(stopCh)
×
1025

×
1026
        <-stopCh
×
1027
        klog.Info("Shutting down workers")
×
1028
}
1029

1030
func recompute() {
×
1031
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
1032
        if err != nil {
×
1033
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
1034
        }
×
1035
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc