• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 14372473322

10 Apr 2025 04:42AM UTC coverage: 21.704% (-0.3%) from 22.009%
14372473322

Pull #5110

github

zbb88888
fix fmt

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>
Pull Request #5110: enable check vlan conflict

0 of 178 new or added lines in 6 files covered. (0.0%)

1053 existing lines in 9 files now uncovered.

10263 of 47286 relevant lines covered (21.7%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "os/exec"
7
        "slices"
8
        "strconv"
9
        "time"
10

11
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
12
        "github.com/scylladb/go-set/strset"
13
        v1 "k8s.io/api/core/v1"
14
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17
        "k8s.io/apimachinery/pkg/util/wait"
18
        "k8s.io/client-go/informers"
19
        "k8s.io/client-go/kubernetes/scheme"
20
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
21
        listerv1 "k8s.io/client-go/listers/core/v1"
22
        "k8s.io/client-go/tools/cache"
23
        "k8s.io/client-go/tools/record"
24
        "k8s.io/client-go/util/workqueue"
25
        "k8s.io/klog/v2"
26
        k8sexec "k8s.io/utils/exec"
27

28
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
29
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
30
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
31
        "github.com/kubeovn/kube-ovn/pkg/ovs"
32
        "github.com/kubeovn/kube-ovn/pkg/util"
33
)
34

35
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
36
type Controller struct {
37
        config *Configuration
38

39
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
40
        providerNetworksSynced          cache.InformerSynced
41
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
42
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
43

44
        vlansLister kubeovnlister.VlanLister
45
        vlansSynced cache.InformerSynced
46

47
        subnetsLister kubeovnlister.SubnetLister
48
        subnetsSynced cache.InformerSynced
49
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
50

51
        ovnEipsLister kubeovnlister.OvnEipLister
52
        ovnEipsSynced cache.InformerSynced
53

54
        podsLister listerv1.PodLister
55
        podsSynced cache.InformerSynced
56
        podQueue   workqueue.TypedRateLimitingInterface[string]
57

58
        nodesLister listerv1.NodeLister
59
        nodesSynced cache.InformerSynced
60

61
        recorder record.EventRecorder
62

63
        protocol string
64

65
        ControllerRuntime
66
        localPodName   string
67
        localNamespace string
68

69
        k8sExec k8sexec.Interface
70
}
71

72
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
73
        if rateLimiter == nil {
74
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
75
        }
76
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
UNCOV
77
}
×
UNCOV
78

×
UNCOV
79
// NewController init a daemon controller
×
80
func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory, nodeInformerFactory informers.SharedInformerFactory, kubeovnInformerFactory kubeovninformer.SharedInformerFactory) (*Controller, error) {
×
81
        eventBroadcaster := record.NewBroadcaster()
82
        eventBroadcaster.StartLogging(klog.Infof)
83
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
84
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
85

×
86
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
87
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
88
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
89
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
90
        podInformer := podInformerFactory.Core().V1().Pods()
×
91
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
92

×
93
        controller := &Controller{
×
94
                config: config,
×
95

×
96
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
97
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
98
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
99
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
100

×
101
                vlansLister: vlanInformer.Lister(),
×
102
                vlansSynced: vlanInformer.Informer().HasSynced,
×
103

×
104
                subnetsLister: subnetInformer.Lister(),
×
105
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
106
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
107

×
108
                ovnEipsLister: ovnEipInformer.Lister(),
×
109
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
110

×
111
                podsLister: podInformer.Lister(),
×
112
                podsSynced: podInformer.Informer().HasSynced,
×
113
                podQueue:   newTypedRateLimitingQueue[string]("Pod", nil),
×
114

×
115
                nodesLister: nodeInformer.Lister(),
×
116
                nodesSynced: nodeInformer.Informer().HasSynced,
×
117

×
118
                recorder: recorder,
×
119
                k8sExec:  k8sexec.New(),
×
120
        }
×
121

×
122
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
123
        if err != nil {
×
124
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
125
        }
×
126
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
127

×
128
        if err = controller.initRuntime(); err != nil {
×
129
                return nil, err
×
130
        }
×
UNCOV
131

×
132
        podInformerFactory.Start(stopCh)
×
133
        nodeInformerFactory.Start(stopCh)
×
134
        kubeovnInformerFactory.Start(stopCh)
×
135

×
136
        if !cache.WaitForCacheSync(stopCh,
×
137
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
138
                controller.podsSynced, controller.nodesSynced) {
×
139
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
140
        }
×
UNCOV
141

×
142
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
143
                AddFunc:    controller.enqueueAddProviderNetwork,
×
144
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
145
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
146
        }); err != nil {
×
147
                return nil, err
×
148
        }
×
149
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
150
                UpdateFunc: controller.enqueueUpdateVlan,
×
151
        }); err != nil {
×
152
                return nil, err
×
153
        }
×
154
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
155
                AddFunc:    controller.enqueueAddSubnet,
×
156
                UpdateFunc: controller.enqueueUpdateSubnet,
×
157
                DeleteFunc: controller.enqueueDeleteSubnet,
×
158
        }); err != nil {
×
159
                return nil, err
×
160
        }
×
161
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
162
                UpdateFunc: controller.enqueuePod,
×
163
        }); err != nil {
×
164
                return nil, err
×
165
        }
×
UNCOV
166

×
167
        return controller, nil
×
UNCOV
168
}
×
UNCOV
169

×
170
func (c *Controller) enqueueAddProviderNetwork(obj interface{}) {
×
171
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
172
        klog.V(3).Infof("enqueue add provider network %s", key)
×
173
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
174
}
×
UNCOV
175

×
176
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj interface{}) {
177
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
178
        klog.V(3).Infof("enqueue update provider network %s", key)
×
179
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
180
}
×
UNCOV
181

×
182
func (c *Controller) enqueueDeleteProviderNetwork(obj interface{}) {
183
        pn := obj.(*kubeovnv1.ProviderNetwork)
×
184
        key := cache.MetaObjectToName(pn).String()
185
        klog.V(3).Infof("enqueue delete provider network %s", key)
186
        c.deleteProviderNetworkQueue.Add(pn)
×
187
}
×
UNCOV
188

×
189
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
190
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
191
        }
UNCOV
192
}
×
UNCOV
193

×
194
func (c *Controller) runDeleteProviderNetworkWorker() {
×
195
        for c.processNextDeleteProviderNetworkWorkItem() {
×
196
        }
×
197
}
UNCOV
198

×
199
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
200
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
201
        if shutdown {
×
202
                return false
×
203
        }
×
204

205
        err := func(key string) error {
×
206
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
207
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
208
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
209
                }
210
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
211
                return nil
×
UNCOV
212
        }(key)
×
213
        if err != nil {
214
                utilruntime.HandleError(err)
215
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
216
                return true
×
217
        }
×
218
        return true
×
UNCOV
219
}
×
220

221
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
222
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
223
        if shutdown {
×
224
                return false
×
225
        }
×
UNCOV
226

×
227
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
228
                defer c.deleteProviderNetworkQueue.Done(obj)
229
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
230
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
231
                }
×
232
                c.deleteProviderNetworkQueue.Forget(obj)
×
233
                return nil
×
UNCOV
234
        }(obj)
×
235
        if err != nil {
236
                utilruntime.HandleError(err)
237
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
238
                return true
×
239
        }
×
240
        return true
×
UNCOV
241
}
×
242

243
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
244
        klog.V(3).Infof("handle update provider network %s", key)
×
245
        node, err := c.nodesLister.Get(c.config.NodeName)
×
246
        if err != nil {
×
247
                klog.Error(err)
×
248
                return err
×
249
        }
×
250
        pn, err := c.providerNetworksLister.Get(key)
251
        if err != nil {
×
252
                if k8serrors.IsNotFound(err) {
×
253
                        return nil
×
254
                }
×
255
                klog.Error(err)
×
256
                return err
×
257
        }
258

259
        if slices.Contains(pn.Spec.ExcludeNodes, node.Name) {
×
260
                c.recordProviderNetworkErr(pn.Name, "")
×
261
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
262
        }
×
263
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
UNCOV
264
}
×
UNCOV
265

×
266
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
267
        nic := pn.Spec.DefaultInterface
×
268
        for _, item := range pn.Spec.CustomInterfaces {
×
269
                if slices.Contains(item.Nodes, node.Name) {
×
270
                        nic = item.Interface
×
271
                        break
×
UNCOV
272
                }
×
273
        }
274

275
        patch := util.KVPatch{
×
276
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
277
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
278
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
279
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
280
        }
281

282
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
283
        for _, vlanName := range pn.Status.Vlans {
×
284
                vlan, err := c.vlansLister.Get(vlanName)
×
285
                if err != nil {
×
286
                        if k8serrors.IsNotFound(err) {
×
287
                                klog.Infof("vlan %s not found", vlanName)
×
288
                                continue
289
                        }
290
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
291
                        return err
×
UNCOV
292
                }
×
NEW
293
                if c.config.EnableCheckVlanConflict && c.config.IfaceVlanID > 0 && vlan.Spec.ID == c.config.IfaceVlanID {
×
NEW
294
                        err = fmt.Errorf("vlan %d is already used by tunnel interface on node %s", vlan.Spec.ID, node.Name)
×
NEW
295
                        klog.Error(err)
×
NEW
296
                        c.recordProviderNetworkErr(pn.Name, err.Error())
×
NEW
297
                        // tunnel interface using, so ovs can not use this vlan
×
NEW
298
                        return err
×
NEW
299
                }
×
UNCOV
300
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
UNCOV
301
        }
×
UNCOV
302
        // always add trunk 0 so that the ovs bridge can communicate with the external network
×
303
        vlans.Add("0")
×
304

×
305
        var mtu int
306
        var err error
×
307
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
308
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
309
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
310
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
311
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
312
                }
×
313
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
314
                return err
×
UNCOV
315
        }
×
UNCOV
316

×
317
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
318
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
319
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
320
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
321
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
322
                return err
×
323
        }
×
324
        c.recordProviderNetworkErr(pn.Name, "")
×
325
        return nil
×
UNCOV
326
}
×
UNCOV
327

×
328
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
329
        var currentPod *v1.Pod
×
330
        var err error
×
331
        if c.localPodName == "" {
332
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
333
                        LabelSelector: "app=kube-ovn-cni",
×
334
                        FieldSelector: fmt.Sprintf("spec.nodeName=%s", c.config.NodeName),
×
335
                })
×
336
                if err != nil {
×
337
                        klog.Errorf("failed to list pod: %v", err)
×
338
                        return
×
339
                }
×
340
                for _, pod := range pods.Items {
×
341
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
342
                                c.localPodName = pod.Name
343
                                c.localNamespace = pod.Namespace
344
                                currentPod = &pod
×
345
                                break
×
UNCOV
346
                        }
×
UNCOV
347
                }
×
348
                if currentPod == nil {
×
349
                        klog.Warning("failed to get self pod")
×
350
                        return
×
351
                }
×
352
        } else {
×
353
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
354
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
355
                        return
×
356
                }
×
UNCOV
357
        }
×
UNCOV
358

×
359
        patch := util.KVPatch{}
×
360
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
361
                if errMsg == "" {
×
362
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
363
                } else {
364
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
365
                }
×
366
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
367
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
368
                        return
×
369
                }
×
UNCOV
370
        }
×
UNCOV
371
}
×
UNCOV
372

×
373
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
374
        patch := util.KVPatch{
375
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
376
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
377
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
378
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
379
        }
×
380
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
381
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
382
                return err
×
383
        }
×
UNCOV
384

×
385
        return c.ovsCleanProviderNetwork(pn.Name)
×
386
}
387

388
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
389
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
390
                klog.Error(err)
×
391
                return err
×
392
        }
×
UNCOV
393

×
394
        node, err := c.nodesLister.Get(c.config.NodeName)
×
395
        if err != nil {
×
396
                klog.Error(err)
×
397
                return err
×
398
        }
×
399
        if len(node.Labels) == 0 {
×
400
                return nil
401
        }
×
402

403
        patch := util.KVPatch{
404
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
405
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
406
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
407
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
408
        }
×
409
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
410
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
411
                return err
×
412
        }
×
UNCOV
413

×
414
        return nil
×
UNCOV
415
}
×
UNCOV
416

×
417
func (c *Controller) enqueueUpdateVlan(oldObj, newObj interface{}) {
×
418
        oldVlan := oldObj.(*kubeovnv1.Vlan)
419
        newVlan := newObj.(*kubeovnv1.Vlan)
×
420
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
421
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
422
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
423
        }
×
UNCOV
424
}
×
UNCOV
425

×
UNCOV
426
type subnetEvent struct {
×
UNCOV
427
        oldObj, newObj interface{}
×
UNCOV
428
}
×
429

430
func (c *Controller) enqueueAddSubnet(obj interface{}) {
×
431
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
432
}
UNCOV
433

×
434
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj interface{}) {
×
435
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
436
}
×
UNCOV
437

×
438
func (c *Controller) enqueueDeleteSubnet(obj interface{}) {
×
439
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
440
}
441

442
func (c *Controller) runSubnetWorker() {
443
        for c.processNextSubnetWorkItem() {
444
        }
445
}
446

447
func (c *Controller) processNextSubnetWorkItem() bool {
448
        obj, shutdown := c.subnetQueue.Get()
449
        if shutdown {
450
                return false
×
451
        }
×
UNCOV
452

×
453
        err := func(obj *subnetEvent) error {
454
                defer c.subnetQueue.Done(obj)
×
455
                if err := c.reconcileRouters(obj); err != nil {
×
456
                        c.subnetQueue.AddRateLimited(obj)
×
457
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
458
                }
×
459
                c.subnetQueue.Forget(obj)
×
460
                return nil
×
461
        }(obj)
462
        if err != nil {
×
463
                utilruntime.HandleError(err)
×
464
                return true
×
465
        }
466
        return true
UNCOV
467
}
×
UNCOV
468

×
469
func (c *Controller) enqueuePod(oldObj, newObj interface{}) {
×
470
        oldPod := oldObj.(*v1.Pod)
471
        newPod := newObj.(*v1.Pod)
×
472
        key := cache.MetaObjectToName(newPod).String()
×
473

×
474
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
475
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
476
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
477
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
478
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
479
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
480
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] {
×
481
                c.podQueue.Add(key)
×
482
                return
483
        }
UNCOV
484

×
485
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
486
        if err != nil {
×
487
                return
×
488
        }
×
489
        for _, multiNet := range attachNets {
490
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
491
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
492
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
493
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
494
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
495
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
496
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
497
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
498
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
499
                                c.podQueue.Add(key)
×
500
                        }
×
UNCOV
501
                }
×
UNCOV
502
        }
×
UNCOV
503
}
×
504

505
func (c *Controller) runPodWorker() {
506
        for c.processNextPodWorkItem() {
×
507
        }
×
UNCOV
508
}
×
UNCOV
509

×
510
func (c *Controller) processNextPodWorkItem() bool {
×
511
        key, shutdown := c.podQueue.Get()
512
        if shutdown {
×
513
                return false
×
514
        }
×
UNCOV
515

×
516
        err := func(key string) error {
×
517
                defer c.podQueue.Done(key)
×
518
                if err := c.handlePod(key); err != nil {
×
519
                        c.podQueue.AddRateLimited(key)
×
520
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
521
                }
×
522
                c.podQueue.Forget(key)
×
523
                return nil
×
UNCOV
524
        }(key)
×
525
        if err != nil {
×
526
                utilruntime.HandleError(err)
527
                return true
528
        }
×
529
        return true
×
UNCOV
530
}
×
UNCOV
531

×
UNCOV
532
var lastNoPodOvsPort map[string]bool
×
UNCOV
533

×
534
func (c *Controller) markAndCleanInternalPort() error {
×
535
        klog.V(4).Infof("start to gc ovs internal ports")
×
536
        residualPorts := ovs.GetResidualInternalPorts()
×
537
        if len(residualPorts) == 0 {
×
538
                return nil
×
539
        }
×
UNCOV
540

×
541
        noPodOvsPort := map[string]bool{}
×
542
        for _, portName := range residualPorts {
×
543
                if !lastNoPodOvsPort[portName] {
544
                        noPodOvsPort[portName] = true
×
545
                } else {
×
546
                        klog.Infof("gc ovs internal port %s", portName)
×
547
                        // Remove ovs port
×
548
                        output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName)
×
549
                        if err != nil {
×
550
                                return fmt.Errorf("failed to delete ovs port %w, %q", err, output)
×
551
                        }
×
UNCOV
552
                }
×
UNCOV
553
        }
×
554
        lastNoPodOvsPort = noPodOvsPort
×
555

×
556
        return nil
×
UNCOV
557
}
×
UNCOV
558

×
UNCOV
559
// Run starts controller
×
560
func (c *Controller) Run(stopCh <-chan struct{}) {
561
        defer utilruntime.HandleCrash()
562
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
563
        defer c.deleteProviderNetworkQueue.ShutDown()
564
        defer c.subnetQueue.ShutDown()
×
565
        defer c.podQueue.ShutDown()
×
566

×
567
        go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh)
568
        go wait.Until(recompute, 10*time.Minute, stopCh)
569
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
570

×
571
        if err := c.setIPSet(); err != nil {
×
572
                util.LogFatalAndExit(err, "failed to set ipsets")
×
573
        }
×
574

575
        klog.Info("Started workers")
×
576
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
577
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
578
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
579
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
580
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
581
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
582
        go wait.Until(c.runPodWorker, time.Second, stopCh)
×
583
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
584
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
585
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
586
        go wait.Until(func() {
×
587
                if err := c.reconcileRouters(nil); err != nil {
×
588
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
589
                }
590
        }, 3*time.Second, stopCh)
591
        go wait.Until(func() {
592
                if err := c.markAndCleanInternalPort(); err != nil {
593
                        klog.Errorf("gc ovs port error: %v", err)
×
594
                }
×
UNCOV
595
        }, 5*time.Minute, stopCh)
×
UNCOV
596

×
597
        if c.config.EnableTProxy {
×
598
                go c.StartTProxyForwarding()
×
599
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
600
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
601
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
602
                // kubelet to tproxy, if probe success recover the iptable rules
×
603
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
604
        } else {
×
605
                c.cleanTProxyConfig()
×
606
        }
×
UNCOV
607

×
608
        if c.config.EnableOVNIPSec {
×
609
                go wait.Until(func() {
×
610
                        if err := c.ManageIPSecKeys(); err != nil {
×
611
                                klog.Errorf("manage ipsec keys error: %v", err)
612
                        }
UNCOV
613
                }, 24*time.Hour, stopCh)
×
614
        } else {
×
615
                if err := c.StopAndClearIPSecResouce(); err != nil {
×
616
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
617
                }
618
        }
UNCOV
619

×
NEW
620
        if c.config.EnableCheckVlanConflict {
×
NEW
621
                if err := c.patchNodeTunnelVlanLabel(); err != nil {
×
NEW
622
                        util.LogFatalAndExit(err, "failed to patch node tunnel vlan label")
×
NEW
623
                }
×
NEW
624
                go wait.Until(c.loopCheckVlanConflict, 60*time.Second, stopCh)
×
NEW
625
        }
×
NEW
626

×
627
        <-stopCh
×
628
        klog.Info("Shutting down workers")
×
UNCOV
629
}
×
UNCOV
630

×
631
func recompute() {
×
632
        output, err := exec.Command("ovn-appctl", "-t", "ovn-controller", "inc-engine/recompute").CombinedOutput()
×
633
        if err != nil {
×
634
                klog.Errorf("failed to recompute ovn-controller %q", output)
635
        }
×
UNCOV
636
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc