• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21743647128

06 Feb 2026 08:17AM UTC coverage: 21.251% (-0.1%) from 21.364%
21743647128

push

github

zhangzujian
cni-server: add static fdb entry for subnets with u2o enabled (#6269)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

0 of 297 new or added lines in 10 files covered. (0.0%)

1 existing line in 1 file now uncovered.

10637 of 50053 relevant lines covered (21.25%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "slices"
7
        "strconv"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
13
        "github.com/scylladb/go-set/strset"
14
        v1 "k8s.io/api/core/v1"
15
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
18
        "k8s.io/apimachinery/pkg/util/wait"
19
        "k8s.io/client-go/informers"
20
        "k8s.io/client-go/kubernetes/scheme"
21
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
22
        listerv1 "k8s.io/client-go/listers/core/v1"
23
        "k8s.io/client-go/tools/cache"
24
        "k8s.io/client-go/tools/record"
25
        "k8s.io/client-go/util/workqueue"
26
        "k8s.io/klog/v2"
27
        k8sexec "k8s.io/utils/exec"
28

29
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
30
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
31
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
32
        "github.com/kubeovn/kube-ovn/pkg/ovs"
33
        "github.com/kubeovn/kube-ovn/pkg/util"
34
)
35

36
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
37
type Controller struct {
38
        config *Configuration
39

40
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
41
        providerNetworksSynced          cache.InformerSynced
42
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
43
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
44

45
        vlansLister kubeovnlister.VlanLister
46
        vlansSynced cache.InformerSynced
47

48
        subnetsLister kubeovnlister.SubnetLister
49
        subnetsSynced cache.InformerSynced
50
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
51

52
        ovnEipsLister kubeovnlister.OvnEipLister
53
        ovnEipsSynced cache.InformerSynced
54

55
        podsLister     listerv1.PodLister
56
        podsSynced     cache.InformerSynced
57
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
58

59
        nodesLister listerv1.NodeLister
60
        nodesSynced cache.InformerSynced
61

62
        servicesLister listerv1.ServiceLister
63
        servicesSynced cache.InformerSynced
64
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
65

66
        recorder record.EventRecorder
67

68
        protocol string
69

70
        ControllerRuntime
71
        localPodName   string
72
        localNamespace string
73

74
        k8sExec k8sexec.Interface
75

76
        // channel used for fdb sync
77
        fdbSyncChan   chan struct{}
78
        fdbSyncMutex  sync.Mutex
79
        vswitchClient ovs.Vswitch
80
}
81

82
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
83
        if rateLimiter == nil {
×
84
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
85
        }
×
86
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
87
}
88

89
// NewController init a daemon controller
90
func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory, nodeInformerFactory informers.SharedInformerFactory, kubeovnInformerFactory kubeovninformer.SharedInformerFactory) (*Controller, error) {
×
91
        eventBroadcaster := record.NewBroadcaster()
×
92
        eventBroadcaster.StartLogging(klog.Infof)
×
93
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
94
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
95
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
96
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
97
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
98
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
99
        podInformer := podInformerFactory.Core().V1().Pods()
×
100
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
101
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
102

×
103
        controller := &Controller{
×
104
                config: config,
×
105

×
106
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
107
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
108
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
109
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
110

×
111
                vlansLister: vlanInformer.Lister(),
×
112
                vlansSynced: vlanInformer.Informer().HasSynced,
×
113

×
114
                subnetsLister: subnetInformer.Lister(),
×
115
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
116
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
117

×
118
                ovnEipsLister: ovnEipInformer.Lister(),
×
119
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
120

×
121
                podsLister:     podInformer.Lister(),
×
122
                podsSynced:     podInformer.Informer().HasSynced,
×
123
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
124

×
125
                nodesLister: nodeInformer.Lister(),
×
126
                nodesSynced: nodeInformer.Informer().HasSynced,
×
127

×
128
                servicesLister: servicesInformer.Lister(),
×
129
                servicesSynced: servicesInformer.Informer().HasSynced,
×
130
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
131

×
132
                recorder: recorder,
×
133
                k8sExec:  k8sexec.New(),
×
NEW
134

×
NEW
135
                fdbSyncChan: make(chan struct{}, 1),
×
136
        }
×
137

×
138
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
139
        if err != nil {
×
140
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
141
        }
×
142
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
143

×
144
        if err = controller.initRuntime(); err != nil {
×
145
                return nil, err
×
146
        }
×
147

148
        podInformerFactory.Start(stopCh)
×
149
        nodeInformerFactory.Start(stopCh)
×
150
        kubeovnInformerFactory.Start(stopCh)
×
151

×
152
        if !cache.WaitForCacheSync(stopCh,
×
153
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
154
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced) {
×
155
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
156
        }
×
157

158
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
159
                AddFunc:    controller.enqueueAddProviderNetwork,
×
160
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
161
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
162
        }); err != nil {
×
163
                return nil, err
×
164
        }
×
165
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
166
                UpdateFunc: controller.enqueueUpdateVlan,
×
167
        }); err != nil {
×
168
                return nil, err
×
169
        }
×
170
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
171
                AddFunc:    controller.enqueueAddSubnet,
×
172
                UpdateFunc: controller.enqueueUpdateSubnet,
×
173
                DeleteFunc: controller.enqueueDeleteSubnet,
×
174
        }); err != nil {
×
175
                return nil, err
×
176
        }
×
177
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
178
                AddFunc:    controller.enqueueAddService,
×
179
                DeleteFunc: controller.enqueueDeleteService,
×
180
                UpdateFunc: controller.enqueueUpdateService,
×
181
        }); err != nil {
×
182
                util.LogFatalAndExit(err, "failed to add service event handler")
×
183
        }
×
184

185
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
186
                UpdateFunc: controller.enqueueUpdatePod,
×
187
        }); err != nil {
×
188
                return nil, err
×
189
        }
×
190

NEW
191
        if controller.vswitchClient, err = ovs.NewVswitchClient("unix:/var/run/openvswitch/db.sock", 1, 3); err != nil {
×
NEW
192
                return nil, fmt.Errorf("failed to create vswitch client: %w", err)
×
NEW
193
        }
×
194

UNCOV
195
        return controller, nil
×
196
}
197

198
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
199
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
200
        klog.V(3).Infof("enqueue add provider network %s", key)
×
201
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
202
}
×
203

204
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
205
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
206
        klog.V(3).Infof("enqueue update provider network %s", key)
×
207
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
208
}
×
209

210
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
211
        var pn *kubeovnv1.ProviderNetwork
×
212
        switch t := obj.(type) {
×
213
        case *kubeovnv1.ProviderNetwork:
×
214
                pn = t
×
215
        case cache.DeletedFinalStateUnknown:
×
216
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
217
                if !ok {
×
218
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
219
                        return
×
220
                }
×
221
                pn = p
×
222
        default:
×
223
                klog.Warningf("unexpected type: %T", obj)
×
224
                return
×
225
        }
226

227
        key := cache.MetaObjectToName(pn).String()
×
228
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
229
        c.deleteProviderNetworkQueue.Add(pn)
×
230
}
231

232
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
233
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
234
        }
×
235
}
236

237
func (c *Controller) runDeleteProviderNetworkWorker() {
×
238
        for c.processNextDeleteProviderNetworkWorkItem() {
×
239
        }
×
240
}
241

242
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
243
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
244
        if shutdown {
×
245
                return false
×
246
        }
×
247

248
        err := func(key string) error {
×
249
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
250
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
251
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
252
                }
×
253
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
254
                return nil
×
255
        }(key)
256
        if err != nil {
×
257
                utilruntime.HandleError(err)
×
258
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
259
                return true
×
260
        }
×
261
        return true
×
262
}
263

264
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
265
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
266
        if shutdown {
×
267
                return false
×
268
        }
×
269

270
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
271
                defer c.deleteProviderNetworkQueue.Done(obj)
×
272
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
273
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
274
                }
×
275
                c.deleteProviderNetworkQueue.Forget(obj)
×
276
                return nil
×
277
        }(obj)
278
        if err != nil {
×
279
                utilruntime.HandleError(err)
×
280
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
281
                return true
×
282
        }
×
283
        return true
×
284
}
285

286
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
287
        klog.V(3).Infof("handle update provider network %s", key)
×
288
        node, err := c.nodesLister.Get(c.config.NodeName)
×
289
        if err != nil {
×
290
                klog.Error(err)
×
291
                return err
×
292
        }
×
293
        pn, err := c.providerNetworksLister.Get(key)
×
294
        if err != nil {
×
295
                if k8serrors.IsNotFound(err) {
×
296
                        return nil
×
297
                }
×
298
                klog.Error(err)
×
299
                return err
×
300
        }
301

302
        if slices.Contains(pn.Spec.ExcludeNodes, node.Name) {
×
303
                c.recordProviderNetworkErr(pn.Name, "")
×
304
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
305
        }
×
306
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
307
}
308

309
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
310
        nic := pn.Spec.DefaultInterface
×
311
        for _, item := range pn.Spec.CustomInterfaces {
×
312
                if slices.Contains(item.Nodes, node.Name) {
×
313
                        nic = item.Interface
×
314
                        break
×
315
                }
316
        }
317

318
        patch := util.KVPatch{
×
319
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
320
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
321
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
322
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
323
        }
×
324

×
325
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
326
        for _, vlanName := range pn.Status.Vlans {
×
327
                vlan, err := c.vlansLister.Get(vlanName)
×
328
                if err != nil {
×
329
                        if k8serrors.IsNotFound(err) {
×
330
                                klog.Infof("vlan %s not found", vlanName)
×
331
                                continue
×
332
                        }
333
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
334
                        return err
×
335
                }
336
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
337
        }
338
        // always add trunk 0 so that the ovs bridge can communicate with the external network
339
        vlans.Add("0")
×
340

×
341
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
342
                parts := strings.SplitN(nic, ".", 2)
×
343
                parentIf := parts[0]
×
344
                if !util.CheckInterfaceExists(nic) {
×
345
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
346
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
347
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
348
                                return err
×
349
                        }
×
350
                } else {
×
351
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
352
                }
×
353
        }
354

355
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, nic); err != nil {
×
356
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
357
                return err
×
358
        }
×
359

360
        var mtu int
×
361
        var err error
×
362
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
363
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
×
364
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
365
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
366
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
367
                }
×
368
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
369
                return err
×
370
        }
371

372
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
373
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
374
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
375
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
376
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
377
                return err
×
378
        }
×
379
        c.recordProviderNetworkErr(pn.Name, "")
×
380
        return nil
×
381
}
382

383
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
384
        var currentPod *v1.Pod
×
385
        var err error
×
386
        if c.localPodName == "" {
×
387
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
388
                        LabelSelector: "app=kube-ovn-cni",
×
389
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
390
                })
×
391
                if err != nil {
×
392
                        klog.Errorf("failed to list pod: %v", err)
×
393
                        return
×
394
                }
×
395
                for _, pod := range pods.Items {
×
396
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
397
                                c.localPodName = pod.Name
×
398
                                c.localNamespace = pod.Namespace
×
399
                                currentPod = &pod
×
400
                                break
×
401
                        }
402
                }
403
                if currentPod == nil {
×
404
                        klog.Warning("failed to get self pod")
×
405
                        return
×
406
                }
×
407
        } else {
×
408
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
409
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
410
                        return
×
411
                }
×
412
        }
413

414
        patch := util.KVPatch{}
×
415
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
416
                if errMsg == "" {
×
417
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
418
                } else {
×
419
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
420
                }
×
421
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
422
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
423
                        return
×
424
                }
×
425
        }
426
}
427

428
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
429
        patch := util.KVPatch{
×
430
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
431
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
432
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
433
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
434
        }
×
435
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
436
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
437
                return err
×
438
        }
×
439

440
        return c.ovsCleanProviderNetwork(pn.Name)
×
441
}
442

443
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
444
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
445
                klog.Error(err)
×
446
                return err
×
447
        }
×
448

449
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, ""); err != nil {
×
450
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
451
                return err
×
452
        }
×
453

454
        node, err := c.nodesLister.Get(c.config.NodeName)
×
455
        if err != nil {
×
456
                klog.Error(err)
×
457
                return err
×
458
        }
×
459
        if len(node.Labels) == 0 {
×
460
                return nil
×
461
        }
×
462

463
        patch := util.KVPatch{
×
464
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
465
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
466
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
467
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
468
        }
×
469
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
470
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
471
                return err
×
472
        }
×
473

474
        return nil
×
475
}
476

477
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
478
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
479
        newVlan := newObj.(*kubeovnv1.Vlan)
×
480
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
481
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
482
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
483
        }
×
484
}
485

486
type subnetEvent struct {
487
        oldObj, newObj any
488
}
489

490
type serviceEvent struct {
491
        oldObj, newObj any
492
}
493

494
func (c *Controller) enqueueAddSubnet(obj any) {
×
495
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
496
}
×
497

498
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
499
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
500
}
×
501

502
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
503
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
504
}
×
505

506
func (c *Controller) runSubnetWorker() {
×
507
        for c.processNextSubnetWorkItem() {
×
508
        }
×
509
}
510

511
func (c *Controller) enqueueAddService(obj any) {
×
512
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
513
}
×
514

515
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
516
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
517
}
×
518

519
func (c *Controller) enqueueDeleteService(obj any) {
×
520
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
521
}
×
522

523
func (c *Controller) runAddOrUpdateServicekWorker() {
×
524
        for c.processNextServiceWorkItem() {
×
525
        }
×
526
}
527

528
func (c *Controller) processNextSubnetWorkItem() bool {
×
529
        obj, shutdown := c.subnetQueue.Get()
×
530
        if shutdown {
×
531
                return false
×
532
        }
×
533

534
        err := func(obj *subnetEvent) error {
×
535
                defer c.subnetQueue.Done(obj)
×
NEW
536
                c.requestFdbSync()
×
537
                if err := c.reconcileRouters(obj); err != nil {
×
538
                        c.subnetQueue.AddRateLimited(obj)
×
539
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
540
                }
×
541
                c.subnetQueue.Forget(obj)
×
542
                return nil
×
543
        }(obj)
544
        if err != nil {
×
545
                utilruntime.HandleError(err)
×
546
                return true
×
547
        }
×
548
        return true
×
549
}
550

551
func (c *Controller) processNextServiceWorkItem() bool {
×
552
        obj, shutdown := c.serviceQueue.Get()
×
553
        if shutdown {
×
554
                return false
×
555
        }
×
556

557
        err := func(obj *serviceEvent) error {
×
558
                defer c.serviceQueue.Done(obj)
×
559
                if err := c.reconcileServices(obj); err != nil {
×
560
                        c.serviceQueue.AddRateLimited(obj)
×
561
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
562
                }
×
563
                c.serviceQueue.Forget(obj)
×
564
                return nil
×
565
        }(obj)
566
        if err != nil {
×
567
                utilruntime.HandleError(err)
×
568
                return true
×
569
        }
×
570
        return true
×
571
}
572

573
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
574
        oldPod := oldObj.(*v1.Pod)
×
575
        newPod := newObj.(*v1.Pod)
×
576
        key := cache.MetaObjectToName(newPod).String()
×
577

×
578
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
579
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
580
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
581
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
582
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
583
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
584
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
585
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
586
                c.updatePodQueue.Add(key)
×
587
                return
×
588
        }
×
589

590
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
591
        if err != nil {
×
592
                return
×
593
        }
×
594
        for _, multiNet := range attachNets {
×
595
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
596
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
597
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
598
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
599
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
600
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
601
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
602
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
603
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
604
                                c.updatePodQueue.Add(key)
×
605
                        }
×
606
                }
607
        }
608
}
609

610
func (c *Controller) runUpdatePodWorker() {
×
611
        for c.processNextUpdatePodWorkItem() {
×
612
        }
×
613
}
614

615
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
616
        key, shutdown := c.updatePodQueue.Get()
×
617
        if shutdown {
×
618
                return false
×
619
        }
×
620

621
        err := func(key string) error {
×
622
                defer c.updatePodQueue.Done(key)
×
623
                if err := c.handleUpdatePod(key); err != nil {
×
624
                        c.updatePodQueue.AddRateLimited(key)
×
625
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
626
                }
×
627
                c.updatePodQueue.Forget(key)
×
628
                return nil
×
629
        }(key)
630
        if err != nil {
×
631
                utilruntime.HandleError(err)
×
632
                return true
×
633
        }
×
634
        return true
×
635
}
636

637
var lastNoPodOvsPort map[string]bool
638

639
func (c *Controller) markAndCleanInternalPort() error {
×
640
        klog.V(4).Infof("start to gc ovs internal ports")
×
641
        residualPorts := ovs.GetResidualInternalPorts()
×
642
        if len(residualPorts) == 0 {
×
643
                return nil
×
644
        }
×
645

646
        noPodOvsPort := map[string]bool{}
×
647
        for _, portName := range residualPorts {
×
648
                if !lastNoPodOvsPort[portName] {
×
649
                        noPodOvsPort[portName] = true
×
650
                } else {
×
651
                        klog.Infof("gc ovs internal port %s", portName)
×
652
                        // Remove ovs port
×
653
                        output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName)
×
654
                        if err != nil {
×
655
                                return fmt.Errorf("failed to delete ovs port %w, %q", err, output)
×
656
                        }
×
657
                }
658
        }
659
        lastNoPodOvsPort = noPodOvsPort
×
660

×
661
        return nil
×
662
}
663

664
// Run starts controller
665
func (c *Controller) Run(stopCh <-chan struct{}) {
×
666
        defer utilruntime.HandleCrash()
×
667
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
668
        defer c.deleteProviderNetworkQueue.ShutDown()
×
669
        defer c.subnetQueue.ShutDown()
×
670
        defer c.serviceQueue.ShutDown()
×
671
        defer c.updatePodQueue.ShutDown()
×
NEW
672
        defer c.vswitchClient.Close()
×
673

×
674
        go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh)
×
675
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
676
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
677

×
678
        if err := c.setIPSet(); err != nil {
×
679
                util.LogFatalAndExit(err, "failed to set ipsets")
×
680
        }
×
681

682
        klog.Info("Started workers")
×
683
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
684
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
685
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
686
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
687
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
688
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
689
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
690
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
691
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
692
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
693
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
694
        go wait.Until(func() {
×
695
                if err := c.reconcileRouters(nil); err != nil {
×
696
                        klog.Errorf("failed to reconcile %s routes: %v", util.NodeNic, err)
×
697
                }
×
698
        }, 3*time.Second, stopCh)
699
        go wait.Until(func() {
×
700
                if err := c.markAndCleanInternalPort(); err != nil {
×
701
                        klog.Errorf("gc ovs port error: %v", err)
×
702
                }
×
703
        }, 5*time.Minute, stopCh)
704

705
        if c.config.EnableTProxy {
×
706
                go c.StartTProxyForwarding()
×
707
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
708
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
709
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
710
                // kubelet to tproxy, if probe success recover the iptable rules
×
711
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
712
        } else {
×
713
                c.cleanTProxyConfig()
×
714
        }
×
715

716
        if c.config.EnableOVNIPSec {
×
717
                go wait.Until(func() {
×
718
                        if err := c.ManageIPSecKeys(); err != nil {
×
719
                                klog.Errorf("manage ipsec keys error: %v", err)
×
720
                        }
×
721
                }, 24*time.Hour, stopCh)
722
        } else {
×
723
                if err := c.StopAndClearIPSecResouce(); err != nil {
×
724
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
725
                }
×
726
        }
727

728
        // Start OpenFlow sync loop
729
        go c.runFlowSync(stopCh)
×
730

×
NEW
731
        // start fdb sync loop
×
NEW
732
        go c.runFdbSync(stopCh)
×
NEW
733

×
734
        <-stopCh
×
735
        klog.Info("Shutting down workers")
×
736
}
737

738
func recompute() {
×
739
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
740
        if err != nil {
×
741
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
742
        }
×
743
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc