• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 19923286340

04 Dec 2025 08:59AM UTC coverage: 22.198% (-0.06%) from 22.257%
19923286340

push

github

web-flow
add auto create vlan sub interface (#5966)

Signed-off-by: clyi <clyi@alauda.io>

0 of 137 new or added lines in 3 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

11537 of 51973 relevant lines covered (22.2%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
14
        "github.com/scylladb/go-set/strset"
15
        v1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        listerv1 "k8s.io/client-go/listers/core/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        "k8s.io/klog/v2"
29
        k8sexec "k8s.io/utils/exec"
30

31
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
32
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
33
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
34
        "github.com/kubeovn/kube-ovn/pkg/ovs"
35
        "github.com/kubeovn/kube-ovn/pkg/util"
36
)
37

38
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
39
type Controller struct {
40
        config *Configuration
41

42
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
43
        providerNetworksSynced          cache.InformerSynced
44
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
45
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
46

47
        vlansLister kubeovnlister.VlanLister
48
        vlansSynced cache.InformerSynced
49

50
        subnetsLister kubeovnlister.SubnetLister
51
        subnetsSynced cache.InformerSynced
52
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
53

54
        ovnEipsLister kubeovnlister.OvnEipLister
55
        ovnEipsSynced cache.InformerSynced
56

57
        podsLister     listerv1.PodLister
58
        podsSynced     cache.InformerSynced
59
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
60
        deletePodQueue workqueue.TypedRateLimitingInterface[*podEvent]
61

62
        nodesLister     listerv1.NodeLister
63
        nodesSynced     cache.InformerSynced
64
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
65

66
        servicesLister listerv1.ServiceLister
67
        servicesSynced cache.InformerSynced
68
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
69

70
        caSecretLister listerv1.SecretLister
71
        caSecretSynced cache.InformerSynced
72
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
73

74
        recorder record.EventRecorder
75

76
        protocol string
77

78
        ControllerRuntime
79
        localPodName   string
80
        localNamespace string
81

82
        k8sExec k8sexec.Interface
83
}
84

85
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
86
        if rateLimiter == nil {
×
87
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
88
        }
×
89
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
90
}
91

92
// NewController init a daemon controller
93
func NewController(config *Configuration,
94
        stopCh <-chan struct{},
95
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
96
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
97
) (*Controller, error) {
×
98
        eventBroadcaster := record.NewBroadcaster()
×
99
        eventBroadcaster.StartLogging(klog.Infof)
×
100
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
101
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
102
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
103
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
104
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
105
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
106
        podInformer := podInformerFactory.Core().V1().Pods()
×
107
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
108
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
109
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
110

×
111
        controller := &Controller{
×
112
                config: config,
×
113

×
114
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
115
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
116
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
117
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
118

×
119
                vlansLister: vlanInformer.Lister(),
×
120
                vlansSynced: vlanInformer.Informer().HasSynced,
×
121

×
122
                subnetsLister: subnetInformer.Lister(),
×
123
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
124
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
125

×
126
                ovnEipsLister: ovnEipInformer.Lister(),
×
127
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
128

×
129
                podsLister:     podInformer.Lister(),
×
130
                podsSynced:     podInformer.Informer().HasSynced,
×
131
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
132
                deletePodQueue: newTypedRateLimitingQueue[*podEvent]("DeletePod", nil),
×
133

×
134
                nodesLister:     nodeInformer.Lister(),
×
135
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
136
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
137

×
138
                servicesLister: servicesInformer.Lister(),
×
139
                servicesSynced: servicesInformer.Informer().HasSynced,
×
140
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
141

×
142
                caSecretLister: caSecretInformer.Lister(),
×
143
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
144
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
145

×
146
                recorder: recorder,
×
147
                k8sExec:  k8sexec.New(),
×
148
        }
×
149

×
150
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
151
        if err != nil {
×
152
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
153
        }
×
154
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
155

×
156
        if err = controller.initRuntime(); err != nil {
×
157
                return nil, err
×
158
        }
×
159

160
        podInformerFactory.Start(stopCh)
×
161
        nodeInformerFactory.Start(stopCh)
×
162
        kubeovnInformerFactory.Start(stopCh)
×
163
        caSecretInformerFactory.Start(stopCh)
×
164

×
165
        if !cache.WaitForCacheSync(stopCh,
×
166
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
167
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
168
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
169
        }
×
170

171
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
172
                AddFunc:    controller.enqueueAddProviderNetwork,
×
173
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
174
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
175
        }); err != nil {
×
176
                return nil, err
×
177
        }
×
178
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
179
                UpdateFunc: controller.enqueueUpdateVlan,
×
180
        }); err != nil {
×
181
                return nil, err
×
182
        }
×
183
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
184
                AddFunc:    controller.enqueueAddSubnet,
×
185
                UpdateFunc: controller.enqueueUpdateSubnet,
×
186
                DeleteFunc: controller.enqueueDeleteSubnet,
×
187
        }); err != nil {
×
188
                return nil, err
×
189
        }
×
190
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
191
                AddFunc:    controller.enqueueAddService,
×
192
                DeleteFunc: controller.enqueueDeleteService,
×
193
                UpdateFunc: controller.enqueueUpdateService,
×
194
        }); err != nil {
×
195
                util.LogFatalAndExit(err, "failed to add service event handler")
×
196
        }
×
197

198
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
199
                UpdateFunc: controller.enqueueUpdatePod,
×
200
                DeleteFunc: controller.enqueueDeletePod,
×
201
        }); err != nil {
×
202
                return nil, err
×
203
        }
×
204
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
205
                AddFunc:    controller.enqueueAddIPSecCA,
×
206
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
207
        }); err != nil {
×
208
                return nil, err
×
209
        }
×
210
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
211
                UpdateFunc: controller.enqueueUpdateNode,
×
212
        }); err != nil {
×
213
                return nil, err
×
214
        }
×
215

216
        return controller, nil
×
217
}
218

219
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
220
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
221
        klog.V(3).Infof("enqueue add CA %s", key)
×
222
        c.ipsecQueue.Add(key)
×
223
}
×
224

225
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
226
        oldSecret := oldObj.(*v1.Secret)
×
227
        newSecret := newObj.(*v1.Secret)
×
228
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
229
                // No changes in CA data, no need to enqueue
×
230
                return
×
231
        }
×
232

233
        key := cache.MetaObjectToName(newSecret).String()
×
234
        klog.V(3).Infof("enqueue update CA %s", key)
×
235
        c.ipsecQueue.Add(key)
×
236
}
237

238
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
239
        oldNode := oldObj.(*v1.Node)
×
240
        newNode := newObj.(*v1.Node)
×
241
        if newNode.Name != c.config.NodeName {
×
242
                return
×
243
        }
×
244
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
245
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
246
                c.updateNodeQueue.Add(newNode.Name)
×
247
        }
×
248
}
249

250
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
251
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
252
        klog.V(3).Infof("enqueue add provider network %s", key)
×
253
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
254
}
×
255

256
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
257
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
258
        klog.V(3).Infof("enqueue update provider network %s", key)
×
259
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
260
}
×
261

262
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
263
        var pn *kubeovnv1.ProviderNetwork
×
264
        switch t := obj.(type) {
×
265
        case *kubeovnv1.ProviderNetwork:
×
266
                pn = t
×
267
        case cache.DeletedFinalStateUnknown:
×
268
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
269
                if !ok {
×
270
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
271
                        return
×
272
                }
×
273
                pn = p
×
274
        default:
×
275
                klog.Warningf("unexpected type: %T", obj)
×
276
                return
×
277
        }
278

279
        key := cache.MetaObjectToName(pn).String()
×
280
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
281
        c.deleteProviderNetworkQueue.Add(pn)
×
282
}
283

284
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
285
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
286
        }
×
287
}
288

289
func (c *Controller) runDeleteProviderNetworkWorker() {
×
290
        for c.processNextDeleteProviderNetworkWorkItem() {
×
291
        }
×
292
}
293

294
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
295
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
296
        if shutdown {
×
297
                return false
×
298
        }
×
299

300
        err := func(key string) error {
×
301
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
302
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
303
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
304
                }
×
305
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
306
                return nil
×
307
        }(key)
308
        if err != nil {
×
309
                utilruntime.HandleError(err)
×
310
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
311
                return true
×
312
        }
×
313
        return true
×
314
}
315

316
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
317
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
318
        if shutdown {
×
319
                return false
×
320
        }
×
321

322
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
323
                defer c.deleteProviderNetworkQueue.Done(obj)
×
324
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
325
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
326
                }
×
327
                c.deleteProviderNetworkQueue.Forget(obj)
×
328
                return nil
×
329
        }(obj)
330
        if err != nil {
×
331
                utilruntime.HandleError(err)
×
332
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
333
                return true
×
334
        }
×
335
        return true
×
336
}
337

338
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
339
        klog.V(3).Infof("handle update provider network %s", key)
×
340
        node, err := c.nodesLister.Get(c.config.NodeName)
×
341
        if err != nil {
×
342
                klog.Error(err)
×
343
                return err
×
344
        }
×
345
        pn, err := c.providerNetworksLister.Get(key)
×
346
        if err != nil {
×
347
                if k8serrors.IsNotFound(err) {
×
348
                        return nil
×
349
                }
×
350
                klog.Error(err)
×
351
                return err
×
352
        }
353

354
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
355
        if err != nil {
×
356
                klog.Error(err)
×
357
                return err
×
358
        }
×
359

360
        if excluded {
×
361
                c.recordProviderNetworkErr(pn.Name, "")
×
362
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
363
        }
×
364
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
365
}
366

367
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
368
        nic := pn.Spec.DefaultInterface
×
369
        for _, item := range pn.Spec.CustomInterfaces {
×
370
                if slices.Contains(item.Nodes, node.Name) {
×
371
                        nic = item.Interface
×
372
                        break
×
373
                }
374
        }
375

376
        patch := util.KVPatch{
×
377
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
378
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
379
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
380
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
381
        }
×
382

×
383
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
384
        for _, vlanName := range pn.Status.Vlans {
×
385
                vlan, err := c.vlansLister.Get(vlanName)
×
386
                if err != nil {
×
387
                        if k8serrors.IsNotFound(err) {
×
388
                                klog.Infof("vlan %s not found", vlanName)
×
389
                                continue
×
390
                        }
391
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
392
                        return err
×
393
                }
394
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
395
        }
396
        // always add trunk 0 so that the ovs bridge can communicate with the external network
397
        vlans.Add("0")
×
398

×
NEW
399
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
NEW
400
                parts := strings.SplitN(nic, ".", 2)
×
NEW
401
                parentIf := parts[0]
×
NEW
402
                if !util.CheckInterfaceExists(nic) {
×
NEW
403
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
NEW
404
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
NEW
405
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
NEW
406
                                return err
×
NEW
407
                        }
×
NEW
408
                } else {
×
NEW
409
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
NEW
410
                }
×
411
        }
412

413
        var mtu int
×
414
        var err error
×
415
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
416
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
×
417
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
418
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
419
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
420
                }
×
421
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
422
                return err
×
423
        }
424

425
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
426
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
427
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
428
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
429
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
430
                return err
×
431
        }
×
432
        c.recordProviderNetworkErr(pn.Name, "")
×
433
        return nil
×
434
}
435

436
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
437
        var currentPod *v1.Pod
×
438
        var err error
×
439
        if c.localPodName == "" {
×
440
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
441
                        LabelSelector: "app=kube-ovn-cni",
×
442
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
443
                })
×
444
                if err != nil {
×
445
                        klog.Errorf("failed to list pod: %v", err)
×
446
                        return
×
447
                }
×
448
                for _, pod := range pods.Items {
×
449
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
450
                                c.localPodName = pod.Name
×
451
                                c.localNamespace = pod.Namespace
×
452
                                currentPod = &pod
×
453
                                break
×
454
                        }
455
                }
456
                if currentPod == nil {
×
457
                        klog.Warning("failed to get self pod")
×
458
                        return
×
459
                }
×
460
        } else {
×
461
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
462
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
463
                        return
×
464
                }
×
465
        }
466

467
        patch := util.KVPatch{}
×
468
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
469
                if errMsg == "" {
×
470
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
471
                } else {
×
472
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
473
                }
×
474
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
475
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
476
                        return
×
477
                }
×
478
        }
479
}
480

481
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
482
        patch := util.KVPatch{
×
483
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
484
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
485
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
486
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
487
        }
×
488
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
489
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
490
                return err
×
491
        }
×
492

493
        return c.ovsCleanProviderNetwork(pn.Name)
×
494
}
495

496
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
497
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
498
                klog.Error(err)
×
499
                return err
×
500
        }
×
501

NEW
502
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name); err != nil {
×
NEW
503
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
NEW
504
                return err
×
NEW
505
        }
×
506

507
        node, err := c.nodesLister.Get(c.config.NodeName)
×
508
        if err != nil {
×
509
                klog.Error(err)
×
510
                return err
×
511
        }
×
512
        if len(node.Labels) == 0 {
×
513
                return nil
×
514
        }
×
515

516
        patch := util.KVPatch{
×
517
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
518
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
519
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
520
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
521
        }
×
522
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
523
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
524
                return err
×
525
        }
×
526

527
        return nil
×
528
}
529

530
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
531
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
532
        newVlan := newObj.(*kubeovnv1.Vlan)
×
533
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
534
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
535
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
536
        }
×
537
}
538

539
type subnetEvent struct {
540
        oldObj, newObj any
541
}
542

543
type serviceEvent struct {
544
        oldObj, newObj any
545
}
546

547
type podEvent struct {
548
        oldObj any
549
}
550

551
func (c *Controller) enqueueAddSubnet(obj any) {
×
552
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
553
}
×
554

555
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
556
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
557
}
×
558

559
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
560
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
561
}
×
562

563
func (c *Controller) runSubnetWorker() {
×
564
        for c.processNextSubnetWorkItem() {
×
565
        }
×
566
}
567

568
func (c *Controller) enqueueAddService(obj any) {
×
569
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
570
}
×
571

572
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
573
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
574
}
×
575

576
func (c *Controller) enqueueDeleteService(obj any) {
×
577
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
578
}
×
579

580
func (c *Controller) runAddOrUpdateServicekWorker() {
×
581
        for c.processNextServiceWorkItem() {
×
582
        }
×
583
}
584

585
func (c *Controller) processNextSubnetWorkItem() bool {
×
586
        obj, shutdown := c.subnetQueue.Get()
×
587
        if shutdown {
×
588
                return false
×
589
        }
×
590

591
        err := func(obj *subnetEvent) error {
×
592
                defer c.subnetQueue.Done(obj)
×
593
                if err := c.reconcileRouters(obj); err != nil {
×
594
                        c.subnetQueue.AddRateLimited(obj)
×
595
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
596
                }
×
597
                c.subnetQueue.Forget(obj)
×
598
                return nil
×
599
        }(obj)
600
        if err != nil {
×
601
                utilruntime.HandleError(err)
×
602
                return true
×
603
        }
×
604
        return true
×
605
}
606

607
func (c *Controller) processNextServiceWorkItem() bool {
×
608
        obj, shutdown := c.serviceQueue.Get()
×
609
        if shutdown {
×
610
                return false
×
611
        }
×
612

613
        err := func(obj *serviceEvent) error {
×
614
                defer c.serviceQueue.Done(obj)
×
615
                if err := c.reconcileServices(obj); err != nil {
×
616
                        c.serviceQueue.AddRateLimited(obj)
×
617
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
618
                }
×
619
                c.serviceQueue.Forget(obj)
×
620
                return nil
×
621
        }(obj)
622
        if err != nil {
×
623
                utilruntime.HandleError(err)
×
624
                return true
×
625
        }
×
626
        return true
×
627
}
628

629
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
630
        oldPod := oldObj.(*v1.Pod)
×
631
        newPod := newObj.(*v1.Pod)
×
632
        key := cache.MetaObjectToName(newPod).String()
×
633

×
634
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
635
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
636
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
637
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
638
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
639
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
640
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
641
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
642
                c.updatePodQueue.Add(key)
×
643
                return
×
644
        }
×
645

646
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
647
        if err != nil {
×
648
                return
×
649
        }
×
650
        for _, multiNet := range attachNets {
×
651
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
652
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
653
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
654
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
655
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
656
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
657
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
658
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
659
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
660
                                c.updatePodQueue.Add(key)
×
661
                        }
×
662
                }
663
        }
664
}
665

666
func (c *Controller) enqueueDeletePod(obj any) {
×
667
        var pod *v1.Pod
×
668
        switch t := obj.(type) {
×
669
        case *v1.Pod:
×
670
                pod = t
×
671
        case cache.DeletedFinalStateUnknown:
×
672
                p, ok := t.Obj.(*v1.Pod)
×
673
                if !ok {
×
674
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
675
                        return
×
676
                }
×
677
                pod = p
×
678
        default:
×
679
                klog.Warningf("unexpected type: %T", obj)
×
680
                return
×
681
        }
682

683
        klog.V(3).Infof("enqueue delete pod %s", pod.Name)
×
684
        c.deletePodQueue.Add(&podEvent{oldObj: pod})
×
685
}
686

687
func (c *Controller) runUpdatePodWorker() {
×
688
        for c.processNextUpdatePodWorkItem() {
×
689
        }
×
690
}
691

692
func (c *Controller) runDeletePodWorker() {
×
693
        for c.processNextDeletePodWorkItem() {
×
694
        }
×
695
}
696

697
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
698
        key, shutdown := c.updatePodQueue.Get()
×
699
        if shutdown {
×
700
                return false
×
701
        }
×
702

703
        err := func(key string) error {
×
704
                defer c.updatePodQueue.Done(key)
×
705
                if err := c.handleUpdatePod(key); err != nil {
×
706
                        c.updatePodQueue.AddRateLimited(key)
×
707
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
708
                }
×
709
                c.updatePodQueue.Forget(key)
×
710
                return nil
×
711
        }(key)
712
        if err != nil {
×
713
                utilruntime.HandleError(err)
×
714
                return true
×
715
        }
×
716
        return true
×
717
}
718

719
func (c *Controller) processNextDeletePodWorkItem() bool {
×
720
        event, shutdown := c.deletePodQueue.Get()
×
721
        if shutdown {
×
722
                return false
×
723
        }
×
724

725
        err := func(event *podEvent) error {
×
726
                defer c.deletePodQueue.Done(event)
×
727
                if err := c.handleDeletePod(event); err != nil {
×
728
                        c.deletePodQueue.AddRateLimited(event)
×
729
                        return fmt.Errorf("error syncing pod event: %w, requeuing", err)
×
730
                }
×
731
                c.deletePodQueue.Forget(event)
×
732
                return nil
×
733
        }(event)
734
        if err != nil {
×
735
                utilruntime.HandleError(err)
×
736
                return true
×
737
        }
×
738
        return true
×
739
}
740

741
func (c *Controller) gcInterfaces() {
×
742
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
743
        if err != nil {
×
744
                klog.Errorf("failed to list interface pod map: %v", err)
×
745
                return
×
746
        }
×
747
        for iface, pod := range interfacePodMap {
×
748
                parts := strings.Split(pod, "/")
×
749
                if len(parts) < 3 {
×
750
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
751
                        continue
×
752
                }
753

754
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
755
                if strings.Contains(errText, "No such device") {
×
756
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
757
                        if err := ovs.CleanInterface(iface); err != nil {
×
758
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
759
                        }
×
760
                        continue
×
761
                }
762

763
                if _, err := c.podsLister.Pods(podNamespace).Get(podName); err != nil && k8serrors.IsNotFound(err) {
×
764
                        // Pod not found by name. Check if this might be a KubeVirt VM.
×
765
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VM name (not the launcher pod name).
×
766
                        // The actual launcher pod has the label 'vm.kubevirt.io/name' with the VM name as value.
×
767
                        // Try to find launcher pods by this label.
×
768
                        selector := labels.SelectorFromSet(map[string]string{util.KubeVirtVMNameLabel: podName})
×
769
                        launcherPods, listErr := c.podsLister.Pods(podNamespace).List(selector)
×
770
                        if listErr != nil {
×
771
                                klog.Errorf("failed to list launcher pods for vm %s/%s: %v", podNamespace, podName, listErr)
×
772
                                continue
×
773
                        }
774

775
                        // If we found launcher pod(s) for this VM, keep the interface
776
                        if len(launcherPods) > 0 {
×
777
                                klog.V(5).Infof("found %d launcher pod(s) for vm %s/%s, keeping ovs interface %s",
×
778
                                        len(launcherPods), podNamespace, podName, iface)
×
779
                                continue
×
780
                        }
781

782
                        // No pod and no launcher pod found - safe to delete
783
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
784
                        if err := ovs.CleanInterface(iface); err != nil {
×
785
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
786
                        }
×
787
                }
788
        }
789
}
790

791
func (c *Controller) runIPSecWorker() {
×
792
        if err := c.StartIPSecService(); err != nil {
×
793
                klog.Errorf("starting ipsec service: %v", err)
×
794
        }
×
795

796
        for c.processNextIPSecWorkItem() {
×
797
        }
×
798
}
799

800
func (c *Controller) processNextIPSecWorkItem() bool {
×
801
        key, shutdown := c.ipsecQueue.Get()
×
802
        if shutdown {
×
803
                return false
×
804
        }
×
805
        defer c.ipsecQueue.Done(key)
×
806

×
807
        err := func(key string) error {
×
808
                if err := c.SyncIPSecKeys(key); err != nil {
×
809
                        c.ipsecQueue.AddRateLimited(key)
×
810
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
811
                }
×
812
                c.ipsecQueue.Forget(key)
×
813
                return nil
×
814
        }(key)
815
        if err != nil {
×
816
                utilruntime.HandleError(err)
×
817
                return true
×
818
        }
×
819
        return true
×
820
}
821

822
func (c *Controller) runUpdateNodeWorker() {
×
823
        for c.processNextUpdateNodeWorkItem() {
×
824
        }
×
825
}
826

827
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
828
        key, shutdown := c.updateNodeQueue.Get()
×
829
        if shutdown {
×
830
                return false
×
831
        }
×
832

833
        err := func(key string) error {
×
834
                defer c.updateNodeQueue.Done(key)
×
835
                if err := c.handleUpdateNode(key); err != nil {
×
836
                        c.updateNodeQueue.AddRateLimited(key)
×
837
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
838
                }
×
839
                c.updateNodeQueue.Forget(key)
×
840
                return nil
×
841
        }(key)
842
        if err != nil {
×
843
                utilruntime.HandleError(err)
×
844
                return true
×
845
        }
×
846
        return true
×
847
}
848

849
func (c *Controller) handleUpdateNode(key string) error {
×
850
        node, err := c.nodesLister.Get(key)
×
851
        if err != nil {
×
852
                if k8serrors.IsNotFound(err) {
×
853
                        return nil
×
854
                }
×
855
                klog.Error(err)
×
856
                return err
×
857
        }
858

859
        klog.Infof("updating node networks for node %s", key)
×
860
        return c.config.UpdateNodeNetworks(node)
×
861
}
862

863
// Run starts controller
864
func (c *Controller) Run(stopCh <-chan struct{}) {
×
865
        defer utilruntime.HandleCrash()
×
866
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
867
        defer c.deleteProviderNetworkQueue.ShutDown()
×
868
        defer c.subnetQueue.ShutDown()
×
869
        defer c.serviceQueue.ShutDown()
×
870
        defer c.updatePodQueue.ShutDown()
×
871
        defer c.deletePodQueue.ShutDown()
×
872
        defer c.ipsecQueue.ShutDown()
×
873
        defer c.updateNodeQueue.ShutDown()
×
874
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
875
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
876
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
877

×
878
        if err := c.setIPSet(); err != nil {
×
879
                util.LogFatalAndExit(err, "failed to set ipsets")
×
880
        }
×
881

882
        klog.Info("Started workers")
×
883
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
884
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
885
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
886
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
887
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
888
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
889
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
890
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
891
        go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
×
892
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
893
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
894
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
895
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
896
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
897
        go wait.Until(func() {
×
898
                if err := c.reconcileRouters(nil); err != nil {
×
899
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
900
                }
×
901
        }, 3*time.Second, stopCh)
902

903
        if c.config.EnableTProxy {
×
904
                go c.StartTProxyForwarding()
×
905
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
906
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
907
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
908
                // kubelet to tproxy, if probe success recover the iptable rules
×
909
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
910
        } else {
×
911
                c.cleanTProxyConfig()
×
912
        }
×
913

914
        if !c.config.EnableOVNIPSec {
×
915
                if err := c.StopAndClearIPSecResource(); err != nil {
×
916
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
917
                }
×
918
        }
919

920
        <-stopCh
×
921
        klog.Info("Shutting down workers")
×
922
}
923

924
func recompute() {
×
925
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
926
        if err != nil {
×
927
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
928
        }
×
929
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc