• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21705765236

05 Feb 2026 09:17AM UTC coverage: 22.964% (-0.07%) from 23.034%
21705765236

Pull #6269

github

zhangzujian
some fixes

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
Pull Request #6269: cni-server: add static fdb entry for subnets with u2o enabled

6 of 192 new or added lines in 4 files covered. (3.13%)

2 existing lines in 1 file now uncovered.

12442 of 54180 relevant lines covered (22.96%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        listerv1 "k8s.io/client-go/listers/core/v1"
26
        "k8s.io/client-go/tools/cache"
27
        "k8s.io/client-go/tools/record"
28
        "k8s.io/client-go/util/workqueue"
29
        "k8s.io/klog/v2"
30
        k8sexec "k8s.io/utils/exec"
31
        kubevirtv1 "kubevirt.io/api/core/v1"
32

33
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
34
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
35
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
36
        "github.com/kubeovn/kube-ovn/pkg/ovs"
37
        "github.com/kubeovn/kube-ovn/pkg/util"
38
)
39

40
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
41
type Controller struct {
42
        config *Configuration
43

44
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
45
        providerNetworksSynced          cache.InformerSynced
46
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
47
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
48

49
        vlansLister kubeovnlister.VlanLister
50
        vlansSynced cache.InformerSynced
51

52
        subnetsLister kubeovnlister.SubnetLister
53
        subnetsSynced cache.InformerSynced
54
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
55

56
        ovnEipsLister kubeovnlister.OvnEipLister
57
        ovnEipsSynced cache.InformerSynced
58

59
        podsLister     listerv1.PodLister
60
        podsSynced     cache.InformerSynced
61
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
62

63
        nodesLister     listerv1.NodeLister
64
        nodesSynced     cache.InformerSynced
65
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
66

67
        servicesLister listerv1.ServiceLister
68
        servicesSynced cache.InformerSynced
69
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
70

71
        caSecretLister listerv1.SecretLister
72
        caSecretSynced cache.InformerSynced
73
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
74

75
        recorder record.EventRecorder
76

77
        protocol string
78

79
        ControllerRuntime
80

81
        k8sExec k8sexec.Interface
82

83
        // channel used for fdb sync
84
        fdbSyncChan  chan struct{}
85
        fdbSyncMutex sync.Mutex
86
}
87

88
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
89
        if rateLimiter == nil {
×
90
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
91
        }
×
92
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
93
}
94

95
// NewController init a daemon controller
96
func NewController(config *Configuration,
97
        stopCh <-chan struct{},
98
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
99
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
100
) (*Controller, error) {
×
101
        eventBroadcaster := record.NewBroadcaster()
×
102
        eventBroadcaster.StartLogging(klog.Infof)
×
103
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
104
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
105
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
106
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
107
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
108
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
109
        podInformer := podInformerFactory.Core().V1().Pods()
×
110
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
111
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
112
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
113

×
114
        controller := &Controller{
×
115
                config: config,
×
116

×
117
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
118
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
119
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
120
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
121

×
122
                vlansLister: vlanInformer.Lister(),
×
123
                vlansSynced: vlanInformer.Informer().HasSynced,
×
124

×
125
                subnetsLister: subnetInformer.Lister(),
×
126
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
127
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
128

×
129
                ovnEipsLister: ovnEipInformer.Lister(),
×
130
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
131

×
132
                podsLister:     podInformer.Lister(),
×
133
                podsSynced:     podInformer.Informer().HasSynced,
×
134
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
135

×
136
                nodesLister:     nodeInformer.Lister(),
×
137
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
138
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
139

×
140
                servicesLister: servicesInformer.Lister(),
×
141
                servicesSynced: servicesInformer.Informer().HasSynced,
×
142
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
143

×
144
                caSecretLister: caSecretInformer.Lister(),
×
145
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
146
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
147

×
148
                recorder: recorder,
×
149
                k8sExec:  k8sexec.New(),
×
NEW
150

×
NEW
151
                fdbSyncChan: make(chan struct{}, 1),
×
152
        }
×
153

×
154
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
155
        if err != nil {
×
156
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
157
        }
×
158
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
159

×
160
        if err = controller.initRuntime(); err != nil {
×
161
                return nil, err
×
162
        }
×
163

164
        podInformerFactory.Start(stopCh)
×
165
        nodeInformerFactory.Start(stopCh)
×
166
        kubeovnInformerFactory.Start(stopCh)
×
167
        caSecretInformerFactory.Start(stopCh)
×
168

×
169
        if !cache.WaitForCacheSync(stopCh,
×
170
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
171
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
172
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
173
        }
×
174

175
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
176
                AddFunc:    controller.enqueueAddProviderNetwork,
×
177
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
178
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
179
        }); err != nil {
×
180
                return nil, err
×
181
        }
×
182
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
183
                UpdateFunc: controller.enqueueUpdateVlan,
×
184
        }); err != nil {
×
185
                return nil, err
×
186
        }
×
187
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
188
                AddFunc:    controller.enqueueAddSubnet,
×
189
                UpdateFunc: controller.enqueueUpdateSubnet,
×
190
                DeleteFunc: controller.enqueueDeleteSubnet,
×
191
        }); err != nil {
×
192
                return nil, err
×
193
        }
×
194
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
195
                AddFunc:    controller.enqueueAddService,
×
196
                DeleteFunc: controller.enqueueDeleteService,
×
197
                UpdateFunc: controller.enqueueUpdateService,
×
198
        }); err != nil {
×
199
                util.LogFatalAndExit(err, "failed to add service event handler")
×
200
        }
×
201

202
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
203
                UpdateFunc: controller.enqueueUpdatePod,
×
204
        }); err != nil {
×
205
                return nil, err
×
206
        }
×
207
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
208
                AddFunc:    controller.enqueueAddIPSecCA,
×
209
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
210
        }); err != nil {
×
211
                return nil, err
×
212
        }
×
213
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
214
                UpdateFunc: controller.enqueueUpdateNode,
×
215
        }); err != nil {
×
216
                return nil, err
×
217
        }
×
218

219
        return controller, nil
×
220
}
221

222
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
223
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
224
        klog.V(3).Infof("enqueue add CA %s", key)
×
225
        c.ipsecQueue.Add(key)
×
226
}
×
227

228
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
229
        oldSecret := oldObj.(*v1.Secret)
×
230
        newSecret := newObj.(*v1.Secret)
×
231
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
232
                // No changes in CA data, no need to enqueue
×
233
                return
×
234
        }
×
235

236
        key := cache.MetaObjectToName(newSecret).String()
×
237
        klog.V(3).Infof("enqueue update CA %s", key)
×
238
        c.ipsecQueue.Add(key)
×
239
}
240

241
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
242
        oldNode := oldObj.(*v1.Node)
×
243
        newNode := newObj.(*v1.Node)
×
244
        if newNode.Name != c.config.NodeName {
×
245
                return
×
246
        }
×
247
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
248
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
249
                c.updateNodeQueue.Add(newNode.Name)
×
250
        }
×
251
}
252

253
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
254
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
255
        klog.V(3).Infof("enqueue add provider network %s", key)
×
256
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
257
}
×
258

259
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
260
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
261
        klog.V(3).Infof("enqueue update provider network %s", key)
×
262
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
263
}
×
264

265
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
266
        var pn *kubeovnv1.ProviderNetwork
×
267
        switch t := obj.(type) {
×
268
        case *kubeovnv1.ProviderNetwork:
×
269
                pn = t
×
270
        case cache.DeletedFinalStateUnknown:
×
271
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
272
                if !ok {
×
273
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
274
                        return
×
275
                }
×
276
                pn = p
×
277
        default:
×
278
                klog.Warningf("unexpected type: %T", obj)
×
279
                return
×
280
        }
281

282
        key := cache.MetaObjectToName(pn).String()
×
283
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
284
        c.deleteProviderNetworkQueue.Add(pn)
×
285
}
286

287
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
288
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
289
        }
×
290
}
291

292
func (c *Controller) runDeleteProviderNetworkWorker() {
×
293
        for c.processNextDeleteProviderNetworkWorkItem() {
×
294
        }
×
295
}
296

297
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
298
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
299
        if shutdown {
×
300
                return false
×
301
        }
×
302

303
        err := func(key string) error {
×
304
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
305
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
306
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
307
                }
×
308
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
309
                return nil
×
310
        }(key)
311
        if err != nil {
×
312
                utilruntime.HandleError(err)
×
313
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
314
                return true
×
315
        }
×
316
        return true
×
317
}
318

319
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
320
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
321
        if shutdown {
×
322
                return false
×
323
        }
×
324

325
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
326
                defer c.deleteProviderNetworkQueue.Done(obj)
×
327
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
328
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
329
                }
×
330
                c.deleteProviderNetworkQueue.Forget(obj)
×
331
                return nil
×
332
        }(obj)
333
        if err != nil {
×
334
                utilruntime.HandleError(err)
×
335
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
336
                return true
×
337
        }
×
338
        return true
×
339
}
340

341
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
342
        klog.V(3).Infof("handle update provider network %s", key)
×
343
        node, err := c.nodesLister.Get(c.config.NodeName)
×
344
        if err != nil {
×
345
                klog.Error(err)
×
346
                return err
×
347
        }
×
348
        pn, err := c.providerNetworksLister.Get(key)
×
349
        if err != nil {
×
350
                if k8serrors.IsNotFound(err) {
×
351
                        return nil
×
352
                }
×
353
                klog.Error(err)
×
354
                return err
×
355
        }
356

357
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
358
        if err != nil {
×
359
                klog.Error(err)
×
360
                return err
×
361
        }
×
362

363
        if excluded {
×
364
                c.recordProviderNetworkErr(pn.Name, "")
×
365
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
366
        }
×
367
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
368
}
369

370
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
371
        nic := pn.Spec.DefaultInterface
×
372
        for _, item := range pn.Spec.CustomInterfaces {
×
373
                if slices.Contains(item.Nodes, node.Name) {
×
374
                        nic = item.Interface
×
375
                        break
×
376
                }
377
        }
378

379
        patch := util.KVPatch{
×
380
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
381
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
382
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
383
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
384
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
385
        }
×
386

×
387
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
388
        for _, vlanName := range pn.Status.Vlans {
×
389
                vlan, err := c.vlansLister.Get(vlanName)
×
390
                if err != nil {
×
391
                        if k8serrors.IsNotFound(err) {
×
392
                                klog.Infof("vlan %s not found", vlanName)
×
393
                                continue
×
394
                        }
395
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
396
                        return err
×
397
                }
398
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
399
        }
400
        // always add trunk 0 so that the ovs bridge can communicate with the external network
401
        vlans.Add("0")
×
402

×
403
        // Auto-create VLAN subinterface if enabled and nic contains VLAN ID
×
404
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
405
                parts := strings.SplitN(nic, ".", 2)
×
406
                parentIf := parts[0]
×
407
                if !util.CheckInterfaceExists(nic) {
×
408
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
409
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
410
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
411
                                return err
×
412
                        }
×
413
                } else {
×
414
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
415
                }
×
416
        }
417

418
        // VLAN sub-interface handling - use map for efficiency
419
        vlanInterfaceMap := make(map[string]int) // interfaceName -> vlanID
×
420

×
421
        // Process explicitly specified VLAN interfaces
×
422
        if len(pn.Spec.VlanInterfaces) > 0 {
×
423
                klog.Infof("Processing %d explicitly specified VLAN interfaces", len(pn.Spec.VlanInterfaces))
×
424
                for _, vlanIfName := range pn.Spec.VlanInterfaces {
×
425
                        if util.CheckInterfaceExists(vlanIfName) {
×
426
                                // Extract VLAN ID from interface name (e.g., "eth0.10" -> 10)
×
427
                                vlanID, err := util.ExtractVlanIDFromInterface(vlanIfName)
×
428
                                if err != nil {
×
429
                                        klog.Warningf("Failed to extract VLAN ID from interface %s: %v", vlanIfName, err)
×
430
                                        continue
×
431
                                }
432
                                vlanInterfaceMap[vlanIfName] = vlanID
×
433
                                vlans.Add(strconv.Itoa(vlanID))
×
434
                                klog.V(3).Infof("Added explicit VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
435
                        } else {
×
436
                                klog.Warningf("Explicitly specified VLAN interface %s does not exist, skipping", vlanIfName)
×
437
                        }
×
438
                }
439
        }
440

441
        // Auto-detection of additional VLAN interfaces (if enabled)
442
        if pn.Spec.PreserveVlanInterfaces {
×
443
                klog.Infof("Auto-detecting VLAN interfaces on %s", nic)
×
444
                vlanIDs := util.DetectVlanInterfaces(nic)
×
445
                for _, vlanID := range vlanIDs {
×
446
                        vlanIfName := fmt.Sprintf("%s.%d", nic, vlanID)
×
447
                        // Only add if not already explicitly specified
×
448
                        if _, exists := vlanInterfaceMap[vlanIfName]; !exists {
×
449
                                vlanInterfaceMap[vlanIfName] = vlanID
×
450
                                vlans.Add(strconv.Itoa(vlanID))
×
451
                                klog.V(3).Infof("Auto-detected VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
452
                        } else {
×
453
                                klog.V(3).Infof("VLAN interface %s already explicitly specified, skipping auto-detection", vlanIfName)
×
454
                        }
×
455
                }
456
                klog.Infof("Auto-detected %d additional VLAN interfaces for %s", len(vlanIDs), nic)
×
457
        }
458

459
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, nic, vlanInterfaceMap); err != nil {
×
460
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
461
                return err
×
462
        }
×
463

464
        var mtu int
×
465
        var err error
×
466
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
467
        // Configure main interface with ALL VLANs (including detected ones) in trunk
×
468
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback, vlanInterfaceMap); err != nil {
×
469
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
470
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
471
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
472
                }
×
473
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
474
                return err
×
475
        }
476

477
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
478
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
479
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
480
        if len(vlanInterfaceMap) > 0 {
×
481
                patch[fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name)] = "true"
×
482
        }
×
483
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
484
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
485
                return err
×
486
        }
×
487
        c.recordProviderNetworkErr(pn.Name, "")
×
488
        return nil
×
489
}
490

491
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
492
        pod, err := c.podsLister.Pods(c.config.PodNamespace).Get(c.config.PodName)
×
493
        if err != nil {
×
494
                klog.Errorf("failed to get pod %s/%s, %v", c.config.PodNamespace, c.config.PodName, err)
×
495
                return
×
496
        }
×
497

498
        patch := util.KVPatch{}
×
499
        if pod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
500
                if errMsg == "" {
×
501
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
502
                } else {
×
503
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
504
                }
×
505
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
506
                        klog.Errorf("failed to patch pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
507
                        return
×
508
                }
×
509
        }
510
}
511

512
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
513
        patch := util.KVPatch{
×
514
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
515
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
516
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
517
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
518
        }
×
519
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
520
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
521
                return err
×
522
        }
×
523

524
        return c.ovsCleanProviderNetwork(pn.Name)
×
525
}
526

527
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
528
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
529
                klog.Error(err)
×
530
                return err
×
531
        }
×
532

533
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, "", nil); err != nil {
×
534
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
535
                return err
×
536
        }
×
537

538
        node, err := c.nodesLister.Get(c.config.NodeName)
×
539
        if err != nil {
×
540
                klog.Error(err)
×
541
                return err
×
542
        }
×
543
        if len(node.Labels) == 0 {
×
544
                return nil
×
545
        }
×
546

547
        patch := util.KVPatch{
×
548
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
549
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
550
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
551
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
552
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
553
        }
×
554
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
555
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
556
                return err
×
557
        }
×
558

559
        return nil
×
560
}
561

562
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
563
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
564
        newVlan := newObj.(*kubeovnv1.Vlan)
×
565
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
566
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
567
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
568
        }
×
569
}
570

571
type subnetEvent struct {
572
        oldObj, newObj any
573
}
574

575
type serviceEvent struct {
576
        oldObj, newObj any
577
}
578

579
func (c *Controller) enqueueAddSubnet(obj any) {
×
580
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
581
}
×
582

583
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
584
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
585
}
×
586

587
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
588
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
589
}
×
590

591
func (c *Controller) runSubnetWorker() {
×
592
        for c.processNextSubnetWorkItem() {
×
593
        }
×
594
}
595

596
func (c *Controller) enqueueAddService(obj any) {
×
597
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
598
}
×
599

600
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
601
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
602
}
×
603

604
func (c *Controller) enqueueDeleteService(obj any) {
×
605
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
606
}
×
607

608
func (c *Controller) runAddOrUpdateServiceWorker() {
×
609
        for c.processNextServiceWorkItem() {
×
610
        }
×
611
}
612

613
func (c *Controller) processNextSubnetWorkItem() bool {
×
614
        obj, shutdown := c.subnetQueue.Get()
×
615
        if shutdown {
×
616
                return false
×
617
        }
×
618

619
        err := func(obj *subnetEvent) error {
×
620
                defer c.subnetQueue.Done(obj)
×
NEW
621
                c.requestFdbSync()
×
622
                if err := c.reconcileRouters(obj); err != nil {
×
623
                        c.subnetQueue.AddRateLimited(obj)
×
624
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
625
                }
×
626
                c.subnetQueue.Forget(obj)
×
627
                return nil
×
628
        }(obj)
629
        if err != nil {
×
630
                utilruntime.HandleError(err)
×
631
                return true
×
632
        }
×
633
        return true
×
634
}
635

636
func (c *Controller) processNextServiceWorkItem() bool {
×
637
        obj, shutdown := c.serviceQueue.Get()
×
638
        if shutdown {
×
639
                return false
×
640
        }
×
641

642
        err := func(obj *serviceEvent) error {
×
643
                defer c.serviceQueue.Done(obj)
×
644
                if err := c.reconcileServices(obj); err != nil {
×
645
                        c.serviceQueue.AddRateLimited(obj)
×
646
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
647
                }
×
648
                c.serviceQueue.Forget(obj)
×
649
                return nil
×
650
        }(obj)
651
        if err != nil {
×
652
                utilruntime.HandleError(err)
×
653
                return true
×
654
        }
×
655
        return true
×
656
}
657

658
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
659
        oldPod := oldObj.(*v1.Pod)
×
660
        newPod := newObj.(*v1.Pod)
×
661
        key := cache.MetaObjectToName(newPod).String()
×
662

×
663
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
664
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
665
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
666
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
667
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
668
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
669
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
670
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
671
                c.updatePodQueue.Add(key)
×
672
                return
×
673
        }
×
674

675
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
676
        if err != nil {
×
677
                return
×
678
        }
×
679
        for _, multiNet := range attachNets {
×
680
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
681
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
682
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
683
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
684
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
685
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
686
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
687
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
688
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
689
                                c.updatePodQueue.Add(key)
×
690
                        }
×
691
                }
692
        }
693
}
694

695
func (c *Controller) runUpdatePodWorker() {
×
696
        for c.processNextUpdatePodWorkItem() {
×
697
        }
×
698
}
699

700
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
701
        key, shutdown := c.updatePodQueue.Get()
×
702
        if shutdown {
×
703
                return false
×
704
        }
×
705

706
        err := func(key string) error {
×
707
                defer c.updatePodQueue.Done(key)
×
708
                if err := c.handleUpdatePod(key); err != nil {
×
709
                        c.updatePodQueue.AddRateLimited(key)
×
710
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
711
                }
×
712
                c.updatePodQueue.Forget(key)
×
713
                return nil
×
714
        }(key)
715
        if err != nil {
×
716
                utilruntime.HandleError(err)
×
717
                return true
×
718
        }
×
719
        return true
×
720
}
721

722
func (c *Controller) gcInterfaces() {
×
723
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
724
        if err != nil {
×
725
                klog.Errorf("failed to list interface pod map: %v", err)
×
726
                return
×
727
        }
×
728
        for iface, pod := range interfacePodMap {
×
729
                parts := strings.Split(pod, "/")
×
730
                if len(parts) < 3 {
×
731
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
732
                        continue
×
733
                }
734

735
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
736
                if strings.Contains(errText, "No such device") {
×
737
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
738
                        if err := ovs.CleanInterface(iface); err != nil {
×
739
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
740
                        }
×
741
                        continue
×
742
                }
743

744
                if _, err = c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
745
                        if !k8serrors.IsNotFound(err) {
×
746
                                klog.Errorf("failed to get pod %s/%s: %v", podNamespace, podName, err)
×
747
                                continue
×
748
                        }
749

750
                        // Pod not found by name. Check if this might be a KubeVirt VM.
751
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VM name (not the launcher pod name).
752
                        // The actual launcher pod has the label 'vm.kubevirt.io/name' with the VM name as value.
753
                        // Try to find launcher pods by this label.
754
                        selector := labels.SelectorFromSet(map[string]string{kubevirtv1.DeprecatedVirtualMachineNameLabel: podName})
×
755
                        launcherPods, err := c.podsLister.Pods(podNamespace).List(selector)
×
756
                        if err != nil {
×
757
                                klog.Errorf("failed to list launcher pods for vm %s/%s: %v", podNamespace, podName, err)
×
758
                                continue
×
759
                        }
760

761
                        // If we found launcher pod(s) for this VM, keep the interface
762
                        if len(launcherPods) > 0 {
×
763
                                klog.V(5).Infof("found %d launcher pod(s) for vm %s/%s, keeping ovs interface %s",
×
764
                                        len(launcherPods), podNamespace, podName, iface)
×
765
                                continue
×
766
                        }
767

768
                        // No pod on this node and no launcher pod found - safe to delete
769
                        klog.Infof("pod %s/%s not found on this node, delete ovs interface %s", podNamespace, podName, iface)
×
770
                        if err = ovs.CleanInterface(iface); err != nil {
×
771
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
772
                        }
×
773
                }
774
        }
775
}
776

777
func (c *Controller) runIPSecWorker() {
×
778
        if err := c.StartIPSecService(); err != nil {
×
779
                klog.Errorf("starting ipsec service: %v", err)
×
780
        }
×
781

782
        for c.processNextIPSecWorkItem() {
×
783
        }
×
784
}
785

786
func (c *Controller) processNextIPSecWorkItem() bool {
×
787
        key, shutdown := c.ipsecQueue.Get()
×
788
        if shutdown {
×
789
                return false
×
790
        }
×
791
        defer c.ipsecQueue.Done(key)
×
792

×
793
        err := func(key string) error {
×
794
                if err := c.SyncIPSecKeys(key); err != nil {
×
795
                        c.ipsecQueue.AddRateLimited(key)
×
796
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
797
                }
×
798
                c.ipsecQueue.Forget(key)
×
799
                return nil
×
800
        }(key)
801
        if err != nil {
×
802
                utilruntime.HandleError(err)
×
803
                return true
×
804
        }
×
805
        return true
×
806
}
807

808
func (c *Controller) runUpdateNodeWorker() {
×
809
        for c.processNextUpdateNodeWorkItem() {
×
810
        }
×
811
}
812

813
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
814
        key, shutdown := c.updateNodeQueue.Get()
×
815
        if shutdown {
×
816
                return false
×
817
        }
×
818

819
        err := func(key string) error {
×
820
                defer c.updateNodeQueue.Done(key)
×
821
                if err := c.handleUpdateNode(key); err != nil {
×
822
                        c.updateNodeQueue.AddRateLimited(key)
×
823
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
824
                }
×
825
                c.updateNodeQueue.Forget(key)
×
826
                return nil
×
827
        }(key)
828
        if err != nil {
×
829
                utilruntime.HandleError(err)
×
830
                return true
×
831
        }
×
832
        return true
×
833
}
834

835
func (c *Controller) handleUpdateNode(key string) error {
×
836
        node, err := c.nodesLister.Get(key)
×
837
        if err != nil {
×
838
                if k8serrors.IsNotFound(err) {
×
839
                        return nil
×
840
                }
×
841
                klog.Error(err)
×
842
                return err
×
843
        }
844

845
        klog.Infof("updating node networks for node %s", key)
×
846
        return c.config.UpdateNodeNetworks(node)
×
847
}
848

849
// Run starts controller
850
func (c *Controller) Run(stopCh <-chan struct{}) {
×
851
        defer utilruntime.HandleCrash()
×
852
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
853
        defer c.deleteProviderNetworkQueue.ShutDown()
×
854
        defer c.subnetQueue.ShutDown()
×
855
        defer c.serviceQueue.ShutDown()
×
856
        defer c.updatePodQueue.ShutDown()
×
857
        defer c.ipsecQueue.ShutDown()
×
858
        defer c.updateNodeQueue.ShutDown()
×
859
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
860
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
861
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
862

×
863
        if err := c.setIPSet(); err != nil {
×
864
                util.LogFatalAndExit(err, "failed to set ipsets")
×
865
        }
×
866

867
        klog.Info("Started workers")
×
868
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
869
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
870
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
871
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
872
        go wait.Until(c.runAddOrUpdateServiceWorker, time.Second, stopCh)
×
873
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
874
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
875
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
876
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
877
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
878
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
879
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
880
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
881
        go wait.Until(func() {
×
882
                if err := c.reconcileRouters(nil); err != nil {
×
883
                        klog.Errorf("failed to reconcile %s routes: %v", util.NodeNic, err)
×
884
                }
×
885
        }, 3*time.Second, stopCh)
886

887
        if c.config.EnableTProxy {
×
888
                go c.StartTProxyForwarding()
×
889
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
890
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
891
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
892
                // kubelet to tproxy, if probe success recover the iptable rules
×
893
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
894
        } else {
×
895
                c.cleanTProxyConfig()
×
896
        }
×
897

898
        if !c.config.EnableOVNIPSec {
×
899
                if err := c.StopAndClearIPSecResource(); err != nil {
×
900
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
901
                }
×
902
        }
903

904
        // Start OpenFlow sync loop
905
        go c.runFlowSync(stopCh)
×
906

×
NEW
907
        // start fdb sync loop
×
NEW
908
        go c.runFdbSync(stopCh)
×
NEW
909

×
910
        <-stopCh
×
911
        klog.Info("Shutting down workers")
×
912
}
913

914
func recompute() {
×
915
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
916
        if err != nil {
×
917
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
918
        }
×
919
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc