• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 17228354799

26 Aug 2025 04:55AM UTC coverage: 21.341% (-0.2%) from 21.508%
17228354799

push

github

oilbeater
handle delete final state unknown object in enqueue handler (#5649)

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

0 of 443 new or added lines in 27 files covered. (0.0%)

1 existing line in 1 file now uncovered.

10514 of 49267 relevant lines covered (21.34%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "os/exec"
7
        "slices"
8
        "strconv"
9
        "time"
10

11
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
12
        "github.com/scylladb/go-set/strset"
13
        v1 "k8s.io/api/core/v1"
14
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17
        "k8s.io/apimachinery/pkg/util/wait"
18
        "k8s.io/client-go/informers"
19
        "k8s.io/client-go/kubernetes/scheme"
20
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
21
        listerv1 "k8s.io/client-go/listers/core/v1"
22
        "k8s.io/client-go/tools/cache"
23
        "k8s.io/client-go/tools/record"
24
        "k8s.io/client-go/util/workqueue"
25
        "k8s.io/klog/v2"
26
        k8sexec "k8s.io/utils/exec"
27

28
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
29
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
30
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
31
        "github.com/kubeovn/kube-ovn/pkg/ovs"
32
        "github.com/kubeovn/kube-ovn/pkg/util"
33
)
34

35
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
36
type Controller struct {
37
        config *Configuration
38

39
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
40
        providerNetworksSynced          cache.InformerSynced
41
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
42
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
43

44
        vlansLister kubeovnlister.VlanLister
45
        vlansSynced cache.InformerSynced
46

47
        subnetsLister kubeovnlister.SubnetLister
48
        subnetsSynced cache.InformerSynced
49
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
50

51
        ovnEipsLister kubeovnlister.OvnEipLister
52
        ovnEipsSynced cache.InformerSynced
53

54
        podsLister     listerv1.PodLister
55
        podsSynced     cache.InformerSynced
56
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
57
        deletePodQueue workqueue.TypedRateLimitingInterface[string]
58

59
        nodesLister listerv1.NodeLister
60
        nodesSynced cache.InformerSynced
61

62
        servicesLister listerv1.ServiceLister
63
        servicesSynced cache.InformerSynced
64
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
65

66
        recorder record.EventRecorder
67

68
        protocol string
69

70
        ControllerRuntime
71
        localPodName   string
72
        localNamespace string
73

74
        k8sExec k8sexec.Interface
75
}
76

77
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
78
        if rateLimiter == nil {
×
79
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
80
        }
×
81
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
82
}
83

84
// NewController init a daemon controller
85
func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory, nodeInformerFactory informers.SharedInformerFactory, kubeovnInformerFactory kubeovninformer.SharedInformerFactory) (*Controller, error) {
×
86
        eventBroadcaster := record.NewBroadcaster()
×
87
        eventBroadcaster.StartLogging(klog.Infof)
×
88
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
89
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
90
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
91
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
92
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
93
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
94
        podInformer := podInformerFactory.Core().V1().Pods()
×
95
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
96
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
97

×
98
        controller := &Controller{
×
99
                config: config,
×
100

×
101
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
102
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
103
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
104
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
105

×
106
                vlansLister: vlanInformer.Lister(),
×
107
                vlansSynced: vlanInformer.Informer().HasSynced,
×
108

×
109
                subnetsLister: subnetInformer.Lister(),
×
110
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
111
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
112

×
113
                ovnEipsLister: ovnEipInformer.Lister(),
×
114
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
115

×
116
                podsLister:     podInformer.Lister(),
×
117
                podsSynced:     podInformer.Informer().HasSynced,
×
118
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
119
                deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil),
×
120

×
121
                nodesLister: nodeInformer.Lister(),
×
122
                nodesSynced: nodeInformer.Informer().HasSynced,
×
123

×
124
                servicesLister: servicesInformer.Lister(),
×
125
                servicesSynced: servicesInformer.Informer().HasSynced,
×
126
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
127

×
128
                recorder: recorder,
×
129
                k8sExec:  k8sexec.New(),
×
130
        }
×
131

×
132
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
133
        if err != nil {
×
134
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
135
        }
×
136
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
137

×
138
        if err = controller.initRuntime(); err != nil {
×
139
                return nil, err
×
140
        }
×
141

142
        podInformerFactory.Start(stopCh)
×
143
        nodeInformerFactory.Start(stopCh)
×
144
        kubeovnInformerFactory.Start(stopCh)
×
145

×
146
        if !cache.WaitForCacheSync(stopCh,
×
147
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
148
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced) {
×
149
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
150
        }
×
151

152
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
153
                AddFunc:    controller.enqueueAddProviderNetwork,
×
154
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
155
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
156
        }); err != nil {
×
157
                return nil, err
×
158
        }
×
159
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
160
                UpdateFunc: controller.enqueueUpdateVlan,
×
161
        }); err != nil {
×
162
                return nil, err
×
163
        }
×
164
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
165
                AddFunc:    controller.enqueueAddSubnet,
×
166
                UpdateFunc: controller.enqueueUpdateSubnet,
×
167
                DeleteFunc: controller.enqueueDeleteSubnet,
×
168
        }); err != nil {
×
169
                return nil, err
×
170
        }
×
171
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
172
                AddFunc:    controller.enqueueAddService,
×
173
                DeleteFunc: controller.enqueueDeleteService,
×
174
                UpdateFunc: controller.enqueueUpdateService,
×
175
        }); err != nil {
×
176
                util.LogFatalAndExit(err, "failed to add service event handler")
×
177
        }
×
178

179
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
180
                UpdateFunc: controller.enqueueUpdatePod,
×
181
                DeleteFunc: controller.enqueueDeletePod,
×
182
        }); err != nil {
×
183
                return nil, err
×
184
        }
×
185

186
        return controller, nil
×
187
}
188

189
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
190
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
191
        klog.V(3).Infof("enqueue add provider network %s", key)
×
192
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
193
}
×
194

195
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
196
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
197
        klog.V(3).Infof("enqueue update provider network %s", key)
×
198
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
199
}
×
200

201
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
NEW
202
        var pn *kubeovnv1.ProviderNetwork
×
NEW
203
        switch t := obj.(type) {
×
NEW
204
        case *kubeovnv1.ProviderNetwork:
×
NEW
205
                pn = t
×
NEW
206
        case cache.DeletedFinalStateUnknown:
×
NEW
207
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
NEW
208
                if !ok {
×
NEW
209
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
NEW
210
                        return
×
NEW
211
                }
×
NEW
212
                pn = p
×
NEW
213
        default:
×
NEW
214
                klog.Warningf("unexpected type: %T", obj)
×
NEW
215
                return
×
216
        }
217

218
        key := cache.MetaObjectToName(pn).String()
×
219
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
220
        c.deleteProviderNetworkQueue.Add(pn)
×
221
}
222

223
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
224
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
225
        }
×
226
}
227

228
func (c *Controller) runDeleteProviderNetworkWorker() {
×
229
        for c.processNextDeleteProviderNetworkWorkItem() {
×
230
        }
×
231
}
232

233
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
234
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
235
        if shutdown {
×
236
                return false
×
237
        }
×
238

239
        err := func(key string) error {
×
240
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
241
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
242
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
243
                }
×
244
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
245
                return nil
×
246
        }(key)
247
        if err != nil {
×
248
                utilruntime.HandleError(err)
×
249
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
250
                return true
×
251
        }
×
252
        return true
×
253
}
254

255
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
256
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
257
        if shutdown {
×
258
                return false
×
259
        }
×
260

261
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
262
                defer c.deleteProviderNetworkQueue.Done(obj)
×
263
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
264
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
265
                }
×
266
                c.deleteProviderNetworkQueue.Forget(obj)
×
267
                return nil
×
268
        }(obj)
269
        if err != nil {
×
270
                utilruntime.HandleError(err)
×
271
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
272
                return true
×
273
        }
×
274
        return true
×
275
}
276

277
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
278
        klog.V(3).Infof("handle update provider network %s", key)
×
279
        node, err := c.nodesLister.Get(c.config.NodeName)
×
280
        if err != nil {
×
281
                klog.Error(err)
×
282
                return err
×
283
        }
×
284
        pn, err := c.providerNetworksLister.Get(key)
×
285
        if err != nil {
×
286
                if k8serrors.IsNotFound(err) {
×
287
                        return nil
×
288
                }
×
289
                klog.Error(err)
×
290
                return err
×
291
        }
292

293
        if slices.Contains(pn.Spec.ExcludeNodes, node.Name) {
×
294
                c.recordProviderNetworkErr(pn.Name, "")
×
295
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
296
        }
×
297
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
298
}
299

300
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
301
        nic := pn.Spec.DefaultInterface
×
302
        for _, item := range pn.Spec.CustomInterfaces {
×
303
                if slices.Contains(item.Nodes, node.Name) {
×
304
                        nic = item.Interface
×
305
                        break
×
306
                }
307
        }
308

309
        patch := util.KVPatch{
×
310
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
311
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
312
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
313
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
314
        }
×
315

×
316
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
317
        for _, vlanName := range pn.Status.Vlans {
×
318
                vlan, err := c.vlansLister.Get(vlanName)
×
319
                if err != nil {
×
320
                        if k8serrors.IsNotFound(err) {
×
321
                                klog.Infof("vlan %s not found", vlanName)
×
322
                                continue
×
323
                        }
324
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
325
                        return err
×
326
                }
327
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
328
        }
329
        // always add trunk 0 so that the ovs bridge can communicate with the external network
330
        vlans.Add("0")
×
331

×
332
        var mtu int
×
333
        var err error
×
334
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
335
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
×
336
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
337
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
338
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
339
                }
×
340
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
341
                return err
×
342
        }
343

344
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
345
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
346
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
347
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
348
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
349
                return err
×
350
        }
×
351
        c.recordProviderNetworkErr(pn.Name, "")
×
352
        return nil
×
353
}
354

355
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
356
        var currentPod *v1.Pod
×
357
        var err error
×
358
        if c.localPodName == "" {
×
359
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
360
                        LabelSelector: "app=kube-ovn-cni",
×
361
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
362
                })
×
363
                if err != nil {
×
364
                        klog.Errorf("failed to list pod: %v", err)
×
365
                        return
×
366
                }
×
367
                for _, pod := range pods.Items {
×
368
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
369
                                c.localPodName = pod.Name
×
370
                                c.localNamespace = pod.Namespace
×
371
                                currentPod = &pod
×
372
                                break
×
373
                        }
374
                }
375
                if currentPod == nil {
×
376
                        klog.Warning("failed to get self pod")
×
377
                        return
×
378
                }
×
379
        } else {
×
380
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
381
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
382
                        return
×
383
                }
×
384
        }
385

386
        patch := util.KVPatch{}
×
387
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
388
                if errMsg == "" {
×
389
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
390
                } else {
×
391
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
392
                }
×
393
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
394
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
395
                        return
×
396
                }
×
397
        }
398
}
399

400
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
401
        patch := util.KVPatch{
×
402
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
403
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
404
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
405
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
406
        }
×
407
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
408
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
409
                return err
×
410
        }
×
411

412
        return c.ovsCleanProviderNetwork(pn.Name)
×
413
}
414

415
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
416
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
417
                klog.Error(err)
×
418
                return err
×
419
        }
×
420

421
        node, err := c.nodesLister.Get(c.config.NodeName)
×
422
        if err != nil {
×
423
                klog.Error(err)
×
424
                return err
×
425
        }
×
426
        if len(node.Labels) == 0 {
×
427
                return nil
×
428
        }
×
429

430
        patch := util.KVPatch{
×
431
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
432
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
433
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
434
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
435
        }
×
436
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
437
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
438
                return err
×
439
        }
×
440

441
        return nil
×
442
}
443

444
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
445
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
446
        newVlan := newObj.(*kubeovnv1.Vlan)
×
447
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
448
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
449
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
450
        }
×
451
}
452

453
type subnetEvent struct {
454
        oldObj, newObj any
455
}
456

457
type serviceEvent struct {
458
        oldObj, newObj any
459
}
460

461
func (c *Controller) enqueueAddSubnet(obj any) {
×
462
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
463
}
×
464

465
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
466
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
467
}
×
468

469
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
470
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
471
}
×
472

473
func (c *Controller) runSubnetWorker() {
×
474
        for c.processNextSubnetWorkItem() {
×
475
        }
×
476
}
477

478
func (c *Controller) enqueueAddService(obj any) {
×
479
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
480
}
×
481

482
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
483
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
484
}
×
485

486
func (c *Controller) enqueueDeleteService(obj any) {
×
487
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
488
}
×
489

490
func (c *Controller) runAddOrUpdateServicekWorker() {
×
491
        for c.processNextServiceWorkItem() {
×
492
        }
×
493
}
494

495
func (c *Controller) processNextSubnetWorkItem() bool {
×
496
        obj, shutdown := c.subnetQueue.Get()
×
497
        if shutdown {
×
498
                return false
×
499
        }
×
500

501
        err := func(obj *subnetEvent) error {
×
502
                defer c.subnetQueue.Done(obj)
×
503
                if err := c.reconcileRouters(obj); err != nil {
×
504
                        c.subnetQueue.AddRateLimited(obj)
×
505
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
506
                }
×
507
                c.subnetQueue.Forget(obj)
×
508
                return nil
×
509
        }(obj)
510
        if err != nil {
×
511
                utilruntime.HandleError(err)
×
512
                return true
×
513
        }
×
514
        return true
×
515
}
516

517
func (c *Controller) processNextServiceWorkItem() bool {
×
518
        obj, shutdown := c.serviceQueue.Get()
×
519
        if shutdown {
×
520
                return false
×
521
        }
×
522

523
        err := func(obj *serviceEvent) error {
×
524
                defer c.serviceQueue.Done(obj)
×
525
                if err := c.reconcileServices(obj); err != nil {
×
526
                        c.serviceQueue.AddRateLimited(obj)
×
527
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
528
                }
×
529
                c.serviceQueue.Forget(obj)
×
530
                return nil
×
531
        }(obj)
532
        if err != nil {
×
533
                utilruntime.HandleError(err)
×
534
                return true
×
535
        }
×
536
        return true
×
537
}
538

539
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
540
        oldPod := oldObj.(*v1.Pod)
×
541
        newPod := newObj.(*v1.Pod)
×
542
        key := cache.MetaObjectToName(newPod).String()
×
543

×
544
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
545
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
546
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
547
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
548
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
549
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
550
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
551
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
552
                c.updatePodQueue.Add(key)
×
553
                return
×
554
        }
×
555

556
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
557
        if err != nil {
×
558
                return
×
559
        }
×
560
        for _, multiNet := range attachNets {
×
561
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
562
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
563
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
564
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
565
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
566
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
567
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
568
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
569
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
570
                                c.updatePodQueue.Add(key)
×
571
                        }
×
572
                }
573
        }
574
}
575

576
func (c *Controller) enqueueDeletePod(obj any) {
×
NEW
577
        var pod *v1.Pod
×
NEW
578
        switch t := obj.(type) {
×
NEW
579
        case *v1.Pod:
×
NEW
580
                pod = t
×
NEW
581
        case cache.DeletedFinalStateUnknown:
×
NEW
582
                p, ok := t.Obj.(*v1.Pod)
×
NEW
583
                if !ok {
×
NEW
584
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
NEW
585
                        return
×
NEW
586
                }
×
NEW
587
                pod = p
×
NEW
588
        default:
×
NEW
589
                klog.Warningf("unexpected type: %T", obj)
×
NEW
590
                return
×
591
        }
592

593
        key := cache.MetaObjectToName(pod).String()
×
594
        c.deletePodQueue.Add(key)
×
595
}
596

597
func (c *Controller) runUpdatePodWorker() {
×
598
        for c.processNextUpdatePodWorkItem() {
×
599
        }
×
600
}
601

602
func (c *Controller) runDeletePodWorker() {
×
603
        for c.processNextDeletePodWorkItem() {
×
604
        }
×
605
}
606

607
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
608
        key, shutdown := c.updatePodQueue.Get()
×
609
        if shutdown {
×
610
                return false
×
611
        }
×
612

613
        err := func(key string) error {
×
614
                defer c.updatePodQueue.Done(key)
×
615
                if err := c.handleUpdatePod(key); err != nil {
×
616
                        c.updatePodQueue.AddRateLimited(key)
×
617
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
618
                }
×
619
                c.updatePodQueue.Forget(key)
×
620
                return nil
×
621
        }(key)
622
        if err != nil {
×
623
                utilruntime.HandleError(err)
×
624
                return true
×
625
        }
×
626
        return true
×
627
}
628

629
func (c *Controller) processNextDeletePodWorkItem() bool {
×
630
        key, shutdown := c.deletePodQueue.Get()
×
631
        if shutdown {
×
632
                return false
×
633
        }
×
634

635
        err := func(key string) error {
×
636
                defer c.deletePodQueue.Done(key)
×
637
                if err := c.handleDeletePod(key); err != nil {
×
638
                        c.deletePodQueue.AddRateLimited(key)
×
639
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
640
                }
×
641
                c.deletePodQueue.Forget(key)
×
642
                return nil
×
643
        }(key)
644
        if err != nil {
×
645
                utilruntime.HandleError(err)
×
646
                return true
×
647
        }
×
648
        return true
×
649
}
650

651
var lastNoPodOvsPort map[string]bool
652

653
func (c *Controller) markAndCleanInternalPort() error {
×
654
        klog.V(4).Infof("start to gc ovs internal ports")
×
655
        residualPorts := ovs.GetResidualInternalPorts()
×
656
        if len(residualPorts) == 0 {
×
657
                return nil
×
658
        }
×
659

660
        noPodOvsPort := map[string]bool{}
×
661
        for _, portName := range residualPorts {
×
662
                if !lastNoPodOvsPort[portName] {
×
663
                        noPodOvsPort[portName] = true
×
664
                } else {
×
665
                        klog.Infof("gc ovs internal port %s", portName)
×
666
                        // Remove ovs port
×
667
                        output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName)
×
668
                        if err != nil {
×
669
                                return fmt.Errorf("failed to delete ovs port %w, %q", err, output)
×
670
                        }
×
671
                }
672
        }
673
        lastNoPodOvsPort = noPodOvsPort
×
674

×
675
        return nil
×
676
}
677

678
// Run starts controller
679
func (c *Controller) Run(stopCh <-chan struct{}) {
×
680
        defer utilruntime.HandleCrash()
×
681
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
682
        defer c.deleteProviderNetworkQueue.ShutDown()
×
683
        defer c.subnetQueue.ShutDown()
×
684
        defer c.serviceQueue.ShutDown()
×
685
        defer c.updatePodQueue.ShutDown()
×
686
        defer c.deletePodQueue.ShutDown()
×
687

×
688
        go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh)
×
689
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
690
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
691

×
692
        if err := c.setIPSet(); err != nil {
×
693
                util.LogFatalAndExit(err, "failed to set ipsets")
×
694
        }
×
695

696
        klog.Info("Started workers")
×
697
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
698
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
699
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
700
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
701
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
702
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
703
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
704
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
705
        go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
×
706
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
707
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
708
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
709
        go wait.Until(func() {
×
710
                if err := c.reconcileRouters(nil); err != nil {
×
711
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
712
                }
×
713
        }, 3*time.Second, stopCh)
714
        go wait.Until(func() {
×
715
                if err := c.markAndCleanInternalPort(); err != nil {
×
716
                        klog.Errorf("gc ovs port error: %v", err)
×
717
                }
×
718
        }, 5*time.Minute, stopCh)
719

720
        if c.config.EnableTProxy {
×
721
                go c.StartTProxyForwarding()
×
722
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
723
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
724
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
725
                // kubelet to tproxy, if probe success recover the iptable rules
×
726
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
727
        } else {
×
728
                c.cleanTProxyConfig()
×
729
        }
×
730

731
        if c.config.EnableOVNIPSec {
×
732
                go wait.Until(func() {
×
733
                        if err := c.ManageIPSecKeys(); err != nil {
×
734
                                klog.Errorf("manage ipsec keys error: %v", err)
×
735
                        }
×
736
                }, 24*time.Hour, stopCh)
737
        } else {
×
738
                if err := c.StopAndClearIPSecResouce(); err != nil {
×
739
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
740
                }
×
741
        }
742

743
        <-stopCh
×
744
        klog.Info("Shutting down workers")
×
745
}
746

747
func recompute() {
×
748
        output, err := exec.Command("ovn-appctl", "-t", "ovn-controller", "inc-engine/recompute").CombinedOutput()
×
749
        if err != nil {
×
750
                klog.Errorf("failed to recompute ovn-controller %q", output)
×
751
        }
×
752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc