• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 18518503263

15 Oct 2025 05:11AM UTC coverage: 21.087% (+0.06%) from 21.028%
18518503263

push

github

web-flow
remove internal-port type interface (#5794)

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

0 of 26 new or added lines in 3 files covered. (0.0%)

6 existing lines in 2 files now uncovered.

10682 of 50656 relevant lines covered (21.09%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "os/exec"
9
        "slices"
10
        "strconv"
11
        "strings"
12
        "time"
13

14
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        listerv1 "k8s.io/client-go/listers/core/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        "k8s.io/klog/v2"
29
        k8sexec "k8s.io/utils/exec"
30

31
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
32
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
33
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
34
        "github.com/kubeovn/kube-ovn/pkg/ovs"
35
        "github.com/kubeovn/kube-ovn/pkg/util"
36
)
37

38
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
39
type Controller struct {
40
        config *Configuration
41

42
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
43
        providerNetworksSynced          cache.InformerSynced
44
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
45
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
46

47
        vlansLister kubeovnlister.VlanLister
48
        vlansSynced cache.InformerSynced
49

50
        subnetsLister kubeovnlister.SubnetLister
51
        subnetsSynced cache.InformerSynced
52
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
53

54
        ovnEipsLister kubeovnlister.OvnEipLister
55
        ovnEipsSynced cache.InformerSynced
56

57
        podsLister     listerv1.PodLister
58
        podsSynced     cache.InformerSynced
59
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
60
        deletePodQueue workqueue.TypedRateLimitingInterface[*podEvent]
61

62
        nodesLister listerv1.NodeLister
63
        nodesSynced cache.InformerSynced
64

65
        servicesLister listerv1.ServiceLister
66
        servicesSynced cache.InformerSynced
67
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
68

69
        caSecretLister listerv1.SecretLister
70
        caSecretSynced cache.InformerSynced
71
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
72

73
        recorder record.EventRecorder
74

75
        protocol string
76

77
        ControllerRuntime
78
        localPodName   string
79
        localNamespace string
80

81
        k8sExec k8sexec.Interface
82
}
83

84
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
85
        if rateLimiter == nil {
×
86
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
87
        }
×
88
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
89
}
90

91
// NewController init a daemon controller
92
func NewController(config *Configuration,
93
        stopCh <-chan struct{},
94
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
95
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
96
) (*Controller, error) {
×
97
        eventBroadcaster := record.NewBroadcaster()
×
98
        eventBroadcaster.StartLogging(klog.Infof)
×
99
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
100
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
101
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
102
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
103
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
104
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
105
        podInformer := podInformerFactory.Core().V1().Pods()
×
106
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
107
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
108
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
109

×
110
        controller := &Controller{
×
111
                config: config,
×
112

×
113
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
114
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
115
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
116
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
117

×
118
                vlansLister: vlanInformer.Lister(),
×
119
                vlansSynced: vlanInformer.Informer().HasSynced,
×
120

×
121
                subnetsLister: subnetInformer.Lister(),
×
122
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
123
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
124

×
125
                ovnEipsLister: ovnEipInformer.Lister(),
×
126
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
127

×
128
                podsLister:     podInformer.Lister(),
×
129
                podsSynced:     podInformer.Informer().HasSynced,
×
130
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
131
                deletePodQueue: newTypedRateLimitingQueue[*podEvent]("DeletePod", nil),
×
132

×
133
                nodesLister: nodeInformer.Lister(),
×
134
                nodesSynced: nodeInformer.Informer().HasSynced,
×
135

×
136
                servicesLister: servicesInformer.Lister(),
×
137
                servicesSynced: servicesInformer.Informer().HasSynced,
×
138
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
139

×
140
                caSecretLister: caSecretInformer.Lister(),
×
141
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
142
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
143

×
144
                recorder: recorder,
×
145
                k8sExec:  k8sexec.New(),
×
146
        }
×
147

×
148
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
149
        if err != nil {
×
150
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
151
        }
×
152
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
153

×
154
        if err = controller.initRuntime(); err != nil {
×
155
                return nil, err
×
156
        }
×
157

158
        podInformerFactory.Start(stopCh)
×
159
        nodeInformerFactory.Start(stopCh)
×
160
        kubeovnInformerFactory.Start(stopCh)
×
161
        caSecretInformerFactory.Start(stopCh)
×
162

×
163
        if !cache.WaitForCacheSync(stopCh,
×
164
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
165
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
166
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
167
        }
×
168

169
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
170
                AddFunc:    controller.enqueueAddProviderNetwork,
×
171
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
172
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
173
        }); err != nil {
×
174
                return nil, err
×
175
        }
×
176
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
177
                UpdateFunc: controller.enqueueUpdateVlan,
×
178
        }); err != nil {
×
179
                return nil, err
×
180
        }
×
181
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
182
                AddFunc:    controller.enqueueAddSubnet,
×
183
                UpdateFunc: controller.enqueueUpdateSubnet,
×
184
                DeleteFunc: controller.enqueueDeleteSubnet,
×
185
        }); err != nil {
×
186
                return nil, err
×
187
        }
×
188
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
189
                AddFunc:    controller.enqueueAddService,
×
190
                DeleteFunc: controller.enqueueDeleteService,
×
191
                UpdateFunc: controller.enqueueUpdateService,
×
192
        }); err != nil {
×
193
                util.LogFatalAndExit(err, "failed to add service event handler")
×
194
        }
×
195

196
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
197
                UpdateFunc: controller.enqueueUpdatePod,
×
198
                DeleteFunc: controller.enqueueDeletePod,
×
199
        }); err != nil {
×
200
                return nil, err
×
201
        }
×
202
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
203
                AddFunc:    controller.enqueueAddIPSecCA,
×
204
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
205
        }); err != nil {
×
206
                return nil, err
×
207
        }
×
208

209
        return controller, nil
×
210
}
211

212
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
213
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
214
        klog.V(3).Infof("enqueue add CA %s", key)
×
215
        c.ipsecQueue.Add(key)
×
216
}
×
217

218
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
219
        oldSecret := oldObj.(*v1.Secret)
×
220
        newSecret := newObj.(*v1.Secret)
×
221
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
222
                // No changes in CA data, no need to enqueue
×
223
                return
×
224
        }
×
225

226
        key := cache.MetaObjectToName(newSecret).String()
×
227
        klog.V(3).Infof("enqueue update CA %s", key)
×
228
        c.ipsecQueue.Add(key)
×
229
}
230

231
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
232
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
233
        klog.V(3).Infof("enqueue add provider network %s", key)
×
234
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
235
}
×
236

237
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
238
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
239
        klog.V(3).Infof("enqueue update provider network %s", key)
×
240
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
241
}
×
242

243
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
244
        var pn *kubeovnv1.ProviderNetwork
×
245
        switch t := obj.(type) {
×
246
        case *kubeovnv1.ProviderNetwork:
×
247
                pn = t
×
248
        case cache.DeletedFinalStateUnknown:
×
249
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
250
                if !ok {
×
251
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
252
                        return
×
253
                }
×
254
                pn = p
×
255
        default:
×
256
                klog.Warningf("unexpected type: %T", obj)
×
257
                return
×
258
        }
259

260
        key := cache.MetaObjectToName(pn).String()
×
261
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
262
        c.deleteProviderNetworkQueue.Add(pn)
×
263
}
264

265
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
266
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
267
        }
×
268
}
269

270
func (c *Controller) runDeleteProviderNetworkWorker() {
×
271
        for c.processNextDeleteProviderNetworkWorkItem() {
×
272
        }
×
273
}
274

275
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
276
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
277
        if shutdown {
×
278
                return false
×
279
        }
×
280

281
        err := func(key string) error {
×
282
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
283
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
284
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
285
                }
×
286
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
287
                return nil
×
288
        }(key)
289
        if err != nil {
×
290
                utilruntime.HandleError(err)
×
291
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
292
                return true
×
293
        }
×
294
        return true
×
295
}
296

297
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
298
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
299
        if shutdown {
×
300
                return false
×
301
        }
×
302

303
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
304
                defer c.deleteProviderNetworkQueue.Done(obj)
×
305
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
306
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
307
                }
×
308
                c.deleteProviderNetworkQueue.Forget(obj)
×
309
                return nil
×
310
        }(obj)
311
        if err != nil {
×
312
                utilruntime.HandleError(err)
×
313
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
314
                return true
×
315
        }
×
316
        return true
×
317
}
318

319
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
320
        klog.V(3).Infof("handle update provider network %s", key)
×
321
        node, err := c.nodesLister.Get(c.config.NodeName)
×
322
        if err != nil {
×
323
                klog.Error(err)
×
324
                return err
×
325
        }
×
326
        pn, err := c.providerNetworksLister.Get(key)
×
327
        if err != nil {
×
328
                if k8serrors.IsNotFound(err) {
×
329
                        return nil
×
330
                }
×
331
                klog.Error(err)
×
332
                return err
×
333
        }
334

335
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
336
        if err != nil {
×
337
                klog.Error(err)
×
338
                return err
×
339
        }
×
340

341
        if excluded {
×
342
                c.recordProviderNetworkErr(pn.Name, "")
×
343
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
344
        }
×
345
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
346
}
347

348
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
349
        nic := pn.Spec.DefaultInterface
×
350
        for _, item := range pn.Spec.CustomInterfaces {
×
351
                if slices.Contains(item.Nodes, node.Name) {
×
352
                        nic = item.Interface
×
353
                        break
×
354
                }
355
        }
356

357
        patch := util.KVPatch{
×
358
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
359
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
360
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
361
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
362
        }
×
363

×
364
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
365
        for _, vlanName := range pn.Status.Vlans {
×
366
                vlan, err := c.vlansLister.Get(vlanName)
×
367
                if err != nil {
×
368
                        if k8serrors.IsNotFound(err) {
×
369
                                klog.Infof("vlan %s not found", vlanName)
×
370
                                continue
×
371
                        }
372
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
373
                        return err
×
374
                }
375
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
376
        }
377
        // always add trunk 0 so that the ovs bridge can communicate with the external network
378
        vlans.Add("0")
×
379

×
380
        var mtu int
×
381
        var err error
×
382
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
383
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
×
384
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
385
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
386
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
387
                }
×
388
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
389
                return err
×
390
        }
391

392
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
393
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
394
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
395
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
396
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
397
                return err
×
398
        }
×
399
        c.recordProviderNetworkErr(pn.Name, "")
×
400
        return nil
×
401
}
402

403
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
404
        var currentPod *v1.Pod
×
405
        var err error
×
406
        if c.localPodName == "" {
×
407
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
408
                        LabelSelector: "app=kube-ovn-cni",
×
409
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
410
                })
×
411
                if err != nil {
×
412
                        klog.Errorf("failed to list pod: %v", err)
×
413
                        return
×
414
                }
×
415
                for _, pod := range pods.Items {
×
416
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
417
                                c.localPodName = pod.Name
×
418
                                c.localNamespace = pod.Namespace
×
419
                                currentPod = &pod
×
420
                                break
×
421
                        }
422
                }
423
                if currentPod == nil {
×
424
                        klog.Warning("failed to get self pod")
×
425
                        return
×
426
                }
×
427
        } else {
×
428
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
429
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
430
                        return
×
431
                }
×
432
        }
433

434
        patch := util.KVPatch{}
×
435
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
436
                if errMsg == "" {
×
437
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
438
                } else {
×
439
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
440
                }
×
441
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
442
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
443
                        return
×
444
                }
×
445
        }
446
}
447

448
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
449
        patch := util.KVPatch{
×
450
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
451
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
452
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
453
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
454
        }
×
455
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
456
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
457
                return err
×
458
        }
×
459

460
        return c.ovsCleanProviderNetwork(pn.Name)
×
461
}
462

463
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
464
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
465
                klog.Error(err)
×
466
                return err
×
467
        }
×
468

469
        node, err := c.nodesLister.Get(c.config.NodeName)
×
470
        if err != nil {
×
471
                klog.Error(err)
×
472
                return err
×
473
        }
×
474
        if len(node.Labels) == 0 {
×
475
                return nil
×
476
        }
×
477

478
        patch := util.KVPatch{
×
479
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
480
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
481
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
482
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
483
        }
×
484
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
485
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
486
                return err
×
487
        }
×
488

489
        return nil
×
490
}
491

492
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
493
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
494
        newVlan := newObj.(*kubeovnv1.Vlan)
×
495
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
496
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
497
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
498
        }
×
499
}
500

501
type subnetEvent struct {
502
        oldObj, newObj any
503
}
504

505
type serviceEvent struct {
506
        oldObj, newObj any
507
}
508

509
type podEvent struct {
510
        oldObj any
511
}
512

513
func (c *Controller) enqueueAddSubnet(obj any) {
×
514
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
515
}
×
516

517
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
518
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
519
}
×
520

521
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
522
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
523
}
×
524

525
func (c *Controller) runSubnetWorker() {
×
526
        for c.processNextSubnetWorkItem() {
×
527
        }
×
528
}
529

530
func (c *Controller) enqueueAddService(obj any) {
×
531
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
532
}
×
533

534
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
535
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
536
}
×
537

538
func (c *Controller) enqueueDeleteService(obj any) {
×
539
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
540
}
×
541

542
func (c *Controller) runAddOrUpdateServicekWorker() {
×
543
        for c.processNextServiceWorkItem() {
×
544
        }
×
545
}
546

547
func (c *Controller) processNextSubnetWorkItem() bool {
×
548
        obj, shutdown := c.subnetQueue.Get()
×
549
        if shutdown {
×
550
                return false
×
551
        }
×
552

553
        err := func(obj *subnetEvent) error {
×
554
                defer c.subnetQueue.Done(obj)
×
555
                if err := c.reconcileRouters(obj); err != nil {
×
556
                        c.subnetQueue.AddRateLimited(obj)
×
557
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
558
                }
×
559
                c.subnetQueue.Forget(obj)
×
560
                return nil
×
561
        }(obj)
562
        if err != nil {
×
563
                utilruntime.HandleError(err)
×
564
                return true
×
565
        }
×
566
        return true
×
567
}
568

569
func (c *Controller) processNextServiceWorkItem() bool {
×
570
        obj, shutdown := c.serviceQueue.Get()
×
571
        if shutdown {
×
572
                return false
×
573
        }
×
574

575
        err := func(obj *serviceEvent) error {
×
576
                defer c.serviceQueue.Done(obj)
×
577
                if err := c.reconcileServices(obj); err != nil {
×
578
                        c.serviceQueue.AddRateLimited(obj)
×
579
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
580
                }
×
581
                c.serviceQueue.Forget(obj)
×
582
                return nil
×
583
        }(obj)
584
        if err != nil {
×
585
                utilruntime.HandleError(err)
×
586
                return true
×
587
        }
×
588
        return true
×
589
}
590

591
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
592
        oldPod := oldObj.(*v1.Pod)
×
593
        newPod := newObj.(*v1.Pod)
×
594
        key := cache.MetaObjectToName(newPod).String()
×
595

×
596
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
597
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
598
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
599
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
600
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
601
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
602
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
603
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
604
                c.updatePodQueue.Add(key)
×
605
                return
×
606
        }
×
607

608
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
609
        if err != nil {
×
610
                return
×
611
        }
×
612
        for _, multiNet := range attachNets {
×
613
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
614
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
615
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
616
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
617
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
618
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
619
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
620
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
621
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
622
                                c.updatePodQueue.Add(key)
×
623
                        }
×
624
                }
625
        }
626
}
627

628
func (c *Controller) enqueueDeletePod(obj any) {
×
629
        var pod *v1.Pod
×
630
        switch t := obj.(type) {
×
631
        case *v1.Pod:
×
632
                pod = t
×
633
        case cache.DeletedFinalStateUnknown:
×
634
                p, ok := t.Obj.(*v1.Pod)
×
635
                if !ok {
×
636
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
637
                        return
×
638
                }
×
639
                pod = p
×
640
        default:
×
641
                klog.Warningf("unexpected type: %T", obj)
×
642
                return
×
643
        }
644

645
        klog.V(3).Infof("enqueue delete pod %s", pod.Name)
×
646
        c.deletePodQueue.Add(&podEvent{oldObj: pod})
×
647
}
648

649
func (c *Controller) runUpdatePodWorker() {
×
650
        for c.processNextUpdatePodWorkItem() {
×
651
        }
×
652
}
653

654
func (c *Controller) runDeletePodWorker() {
×
655
        for c.processNextDeletePodWorkItem() {
×
656
        }
×
657
}
658

659
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
660
        key, shutdown := c.updatePodQueue.Get()
×
661
        if shutdown {
×
662
                return false
×
663
        }
×
664

665
        err := func(key string) error {
×
666
                defer c.updatePodQueue.Done(key)
×
667
                if err := c.handleUpdatePod(key); err != nil {
×
668
                        c.updatePodQueue.AddRateLimited(key)
×
669
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
670
                }
×
671
                c.updatePodQueue.Forget(key)
×
672
                return nil
×
673
        }(key)
674
        if err != nil {
×
675
                utilruntime.HandleError(err)
×
676
                return true
×
677
        }
×
678
        return true
×
679
}
680

681
func (c *Controller) processNextDeletePodWorkItem() bool {
×
682
        event, shutdown := c.deletePodQueue.Get()
×
683
        if shutdown {
×
684
                return false
×
685
        }
×
686

687
        err := func(event *podEvent) error {
×
688
                defer c.deletePodQueue.Done(event)
×
689
                if err := c.handleDeletePod(event); err != nil {
×
690
                        c.deletePodQueue.AddRateLimited(event)
×
691
                        return fmt.Errorf("error syncing pod event: %w, requeuing", err)
×
692
                }
×
693
                c.deletePodQueue.Forget(event)
×
694
                return nil
×
695
        }(event)
696
        if err != nil {
×
697
                utilruntime.HandleError(err)
×
698
                return true
×
699
        }
×
700
        return true
×
701
}
702

UNCOV
703
func (c *Controller) gcInterfaces() {
×
UNCOV
704
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
705
        if err != nil {
×
706
                klog.Errorf("failed to list interface pod map: %v", err)
×
707
                return
×
708
        }
×
709
        for iface, pod := range interfacePodMap {
×
710
                parts := strings.Split(pod, "/")
×
711
                if len(parts) < 3 {
×
712
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
713
                        continue
×
714
                }
715

716
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
717
                if strings.Contains(errText, "No such device") {
×
718
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
719
                        if err := ovs.CleanInterface(iface); err != nil {
×
720
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
721
                        }
×
722
                        continue
×
723
                }
724

725
                if _, err := c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
726
                        if k8serrors.IsNotFound(err) {
×
727
                                klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
728
                                if err := ovs.CleanInterface(iface); err != nil {
×
729
                                        klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
730
                                }
×
731
                        }
732
                }
733
        }
734
}
735

736
func (c *Controller) runIPSecWorker() {
×
737
        if err := c.StartIPSecService(); err != nil {
×
738
                klog.Errorf("starting ipsec service: %v", err)
×
739
        }
×
740

741
        for c.processNextIPSecWorkItem() {
×
742
        }
×
743
}
744

745
func (c *Controller) processNextIPSecWorkItem() bool {
×
746
        key, shutdown := c.ipsecQueue.Get()
×
747
        if shutdown {
×
748
                return false
×
749
        }
×
750
        defer c.ipsecQueue.Done(key)
×
751

×
752
        err := func(key string) error {
×
753
                if err := c.SyncIPSecKeys(key); err != nil {
×
754
                        c.ipsecQueue.AddRateLimited(key)
×
755
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
756
                }
×
757
                c.ipsecQueue.Forget(key)
×
758
                return nil
×
759
        }(key)
760
        if err != nil {
×
761
                utilruntime.HandleError(err)
×
762
                return true
×
763
        }
×
764
        return true
×
765
}
766

767
// Run starts controller
768
func (c *Controller) Run(stopCh <-chan struct{}) {
×
769
        defer utilruntime.HandleCrash()
×
770
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
771
        defer c.deleteProviderNetworkQueue.ShutDown()
×
772
        defer c.subnetQueue.ShutDown()
×
773
        defer c.serviceQueue.ShutDown()
×
774
        defer c.updatePodQueue.ShutDown()
×
775
        defer c.deletePodQueue.ShutDown()
×
776
        defer c.ipsecQueue.ShutDown()
×
777
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
778
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
779
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
780

×
781
        if err := c.setIPSet(); err != nil {
×
782
                util.LogFatalAndExit(err, "failed to set ipsets")
×
783
        }
×
784

785
        klog.Info("Started workers")
×
786
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
787
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
788
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
789
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
790
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
791
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
792
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
793
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
794
        go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
×
795
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
796
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
797
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
798
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
799
        go wait.Until(func() {
×
800
                if err := c.reconcileRouters(nil); err != nil {
×
801
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
802
                }
×
803
        }, 3*time.Second, stopCh)
804

805
        if c.config.EnableTProxy {
×
806
                go c.StartTProxyForwarding()
×
807
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
808
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
809
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
810
                // kubelet to tproxy, if probe success recover the iptable rules
×
811
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
812
        } else {
×
813
                c.cleanTProxyConfig()
×
814
        }
×
815

816
        if !c.config.EnableOVNIPSec {
×
817
                if err := c.StopAndClearIPSecResource(); err != nil {
×
818
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
819
                }
×
820
        }
821

822
        <-stopCh
×
823
        klog.Info("Shutting down workers")
×
824
}
825

826
func recompute() {
×
827
        output, err := exec.Command("ovn-appctl", "-t", "ovn-controller", "inc-engine/recompute").CombinedOutput()
×
828
        if err != nil {
×
829
                klog.Errorf("failed to recompute ovn-controller %q", output)
×
830
        }
×
831
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc