• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 16612141263

30 Jul 2025 02:41AM UTC coverage: 21.453% (+0.03%) from 21.423%
16612141263

push

github

web-flow
support nodeselector for providerNetwork (#5518)

* support nodeselector for providerNetwork

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* add codegen

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

---------

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

22 of 39 new or added lines in 5 files covered. (56.41%)

2 existing lines in 1 file now uncovered.

10561 of 49229 relevant lines covered (21.45%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "os/exec"
9
        "slices"
10
        "strconv"
11
        "time"
12

13
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
14
        "github.com/scylladb/go-set/strset"
15
        v1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
19
        "k8s.io/apimachinery/pkg/util/wait"
20
        "k8s.io/client-go/informers"
21
        "k8s.io/client-go/kubernetes/scheme"
22
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
23
        listerv1 "k8s.io/client-go/listers/core/v1"
24
        "k8s.io/client-go/tools/cache"
25
        "k8s.io/client-go/tools/record"
26
        "k8s.io/client-go/util/workqueue"
27
        "k8s.io/klog/v2"
28
        k8sexec "k8s.io/utils/exec"
29

30
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
31
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
32
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
33
        "github.com/kubeovn/kube-ovn/pkg/ovs"
34
        "github.com/kubeovn/kube-ovn/pkg/util"
35
)
36

37
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
38
type Controller struct {
39
        config *Configuration
40

41
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
42
        providerNetworksSynced          cache.InformerSynced
43
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
44
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
45

46
        vlansLister kubeovnlister.VlanLister
47
        vlansSynced cache.InformerSynced
48

49
        subnetsLister kubeovnlister.SubnetLister
50
        subnetsSynced cache.InformerSynced
51
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
52

53
        ovnEipsLister kubeovnlister.OvnEipLister
54
        ovnEipsSynced cache.InformerSynced
55

56
        podsLister     listerv1.PodLister
57
        podsSynced     cache.InformerSynced
58
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
59
        deletePodQueue workqueue.TypedRateLimitingInterface[string]
60

61
        nodesLister listerv1.NodeLister
62
        nodesSynced cache.InformerSynced
63

64
        servicesLister listerv1.ServiceLister
65
        servicesSynced cache.InformerSynced
66
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
67

68
        caSecretLister listerv1.SecretLister
69
        caSecretSynced cache.InformerSynced
70
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
71

72
        recorder record.EventRecorder
73

74
        protocol string
75

76
        ControllerRuntime
77
        localPodName   string
78
        localNamespace string
79

80
        k8sExec k8sexec.Interface
81
}
82

83
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
84
        if rateLimiter == nil {
×
85
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
86
        }
×
87
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
88
}
89

90
// NewController init a daemon controller
91
func NewController(config *Configuration,
92
        stopCh <-chan struct{},
93
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
94
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
95
) (*Controller, error) {
×
96
        eventBroadcaster := record.NewBroadcaster()
×
97
        eventBroadcaster.StartLogging(klog.Infof)
×
98
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
99
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
100
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
101
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
102
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
103
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
104
        podInformer := podInformerFactory.Core().V1().Pods()
×
105
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
106
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
107
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
108

×
109
        controller := &Controller{
×
110
                config: config,
×
111

×
112
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
113
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
114
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
115
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
116

×
117
                vlansLister: vlanInformer.Lister(),
×
118
                vlansSynced: vlanInformer.Informer().HasSynced,
×
119

×
120
                subnetsLister: subnetInformer.Lister(),
×
121
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
122
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
123

×
124
                ovnEipsLister: ovnEipInformer.Lister(),
×
125
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
126

×
127
                podsLister:     podInformer.Lister(),
×
128
                podsSynced:     podInformer.Informer().HasSynced,
×
129
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
130
                deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil),
×
131

×
132
                nodesLister: nodeInformer.Lister(),
×
133
                nodesSynced: nodeInformer.Informer().HasSynced,
×
134

×
135
                servicesLister: servicesInformer.Lister(),
×
136
                servicesSynced: servicesInformer.Informer().HasSynced,
×
137
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
138

×
139
                caSecretLister: caSecretInformer.Lister(),
×
140
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
141
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
142

×
143
                recorder: recorder,
×
144
                k8sExec:  k8sexec.New(),
×
145
        }
×
146

×
147
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
148
        if err != nil {
×
149
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
150
        }
×
151
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
152

×
153
        if err = controller.initRuntime(); err != nil {
×
154
                return nil, err
×
155
        }
×
156

157
        podInformerFactory.Start(stopCh)
×
158
        nodeInformerFactory.Start(stopCh)
×
159
        kubeovnInformerFactory.Start(stopCh)
×
160
        caSecretInformerFactory.Start(stopCh)
×
161

×
162
        if !cache.WaitForCacheSync(stopCh,
×
163
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
164
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
165
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
166
        }
×
167

168
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
169
                AddFunc:    controller.enqueueAddProviderNetwork,
×
170
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
171
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
172
        }); err != nil {
×
173
                return nil, err
×
174
        }
×
175
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
176
                UpdateFunc: controller.enqueueUpdateVlan,
×
177
        }); err != nil {
×
178
                return nil, err
×
179
        }
×
180
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
181
                AddFunc:    controller.enqueueAddSubnet,
×
182
                UpdateFunc: controller.enqueueUpdateSubnet,
×
183
                DeleteFunc: controller.enqueueDeleteSubnet,
×
184
        }); err != nil {
×
185
                return nil, err
×
186
        }
×
187
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
188
                AddFunc:    controller.enqueueAddService,
×
189
                DeleteFunc: controller.enqueueDeleteService,
×
190
                UpdateFunc: controller.enqueueUpdateService,
×
191
        }); err != nil {
×
192
                util.LogFatalAndExit(err, "failed to add service event handler")
×
193
        }
×
194

195
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
196
                UpdateFunc: controller.enqueueUpdatePod,
×
197
                DeleteFunc: controller.enqueueDeletePod,
×
198
        }); err != nil {
×
199
                return nil, err
×
200
        }
×
201
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
202
                AddFunc:    controller.enqueueAddIPSecCA,
×
203
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
204
        }); err != nil {
×
205
                return nil, err
×
206
        }
×
207

208
        return controller, nil
×
209
}
210

211
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
212
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
213
        klog.V(3).Infof("enqueue add CA %s", key)
×
214
        c.ipsecQueue.Add(key)
×
215
}
×
216

217
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
218
        oldSecret := oldObj.(*v1.Secret)
×
219
        newSecret := newObj.(*v1.Secret)
×
220
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
221
                // No changes in CA data, no need to enqueue
×
222
                return
×
223
        }
×
224

225
        key := cache.MetaObjectToName(newSecret).String()
×
226
        klog.V(3).Infof("enqueue update CA %s", key)
×
227
        c.ipsecQueue.Add(key)
×
228
}
229

230
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
231
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
232
        klog.V(3).Infof("enqueue add provider network %s", key)
×
233
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
234
}
×
235

236
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
237
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
238
        klog.V(3).Infof("enqueue update provider network %s", key)
×
239
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
240
}
×
241

242
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
243
        pn := obj.(*kubeovnv1.ProviderNetwork)
×
244
        key := cache.MetaObjectToName(pn).String()
×
245
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
246
        c.deleteProviderNetworkQueue.Add(pn)
×
247
}
×
248

249
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
250
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
251
        }
×
252
}
253

254
func (c *Controller) runDeleteProviderNetworkWorker() {
×
255
        for c.processNextDeleteProviderNetworkWorkItem() {
×
256
        }
×
257
}
258

259
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
260
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
261
        if shutdown {
×
262
                return false
×
263
        }
×
264

265
        err := func(key string) error {
×
266
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
267
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
268
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
269
                }
×
270
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
271
                return nil
×
272
        }(key)
273
        if err != nil {
×
274
                utilruntime.HandleError(err)
×
275
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
276
                return true
×
277
        }
×
278
        return true
×
279
}
280

281
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
282
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
283
        if shutdown {
×
284
                return false
×
285
        }
×
286

287
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
288
                defer c.deleteProviderNetworkQueue.Done(obj)
×
289
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
290
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
291
                }
×
292
                c.deleteProviderNetworkQueue.Forget(obj)
×
293
                return nil
×
294
        }(obj)
295
        if err != nil {
×
296
                utilruntime.HandleError(err)
×
297
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
298
                return true
×
299
        }
×
300
        return true
×
301
}
302

303
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
304
        klog.V(3).Infof("handle update provider network %s", key)
×
305
        node, err := c.nodesLister.Get(c.config.NodeName)
×
306
        if err != nil {
×
307
                klog.Error(err)
×
308
                return err
×
309
        }
×
310
        pn, err := c.providerNetworksLister.Get(key)
×
311
        if err != nil {
×
312
                if k8serrors.IsNotFound(err) {
×
313
                        return nil
×
314
                }
×
315
                klog.Error(err)
×
316
                return err
×
317
        }
318

NEW
319
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
NEW
320
        if err != nil {
×
NEW
321
                klog.Error(err)
×
NEW
322
                return err
×
NEW
323
        }
×
324

NEW
325
        if excluded {
×
326
                c.recordProviderNetworkErr(pn.Name, "")
×
327
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
328
        }
×
329
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
330
}
331

332
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
333
        nic := pn.Spec.DefaultInterface
×
334
        for _, item := range pn.Spec.CustomInterfaces {
×
335
                if slices.Contains(item.Nodes, node.Name) {
×
336
                        nic = item.Interface
×
337
                        break
×
338
                }
339
        }
340

341
        patch := util.KVPatch{
×
342
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
343
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
344
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
345
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
346
        }
×
347

×
348
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
349
        for _, vlanName := range pn.Status.Vlans {
×
350
                vlan, err := c.vlansLister.Get(vlanName)
×
351
                if err != nil {
×
352
                        if k8serrors.IsNotFound(err) {
×
353
                                klog.Infof("vlan %s not found", vlanName)
×
354
                                continue
×
355
                        }
356
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
357
                        return err
×
358
                }
359
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
360
        }
361
        // always add trunk 0 so that the ovs bridge can communicate with the external network
362
        vlans.Add("0")
×
363

×
364
        var mtu int
×
365
        var err error
×
366
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
367
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
×
368
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
369
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
370
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
371
                }
×
372
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
373
                return err
×
374
        }
375

376
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
377
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
378
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
379
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
380
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
381
                return err
×
382
        }
×
383
        c.recordProviderNetworkErr(pn.Name, "")
×
384
        return nil
×
385
}
386

387
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
388
        var currentPod *v1.Pod
×
389
        var err error
×
390
        if c.localPodName == "" {
×
391
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
392
                        LabelSelector: "app=kube-ovn-cni",
×
393
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
394
                })
×
395
                if err != nil {
×
396
                        klog.Errorf("failed to list pod: %v", err)
×
397
                        return
×
398
                }
×
399
                for _, pod := range pods.Items {
×
400
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
401
                                c.localPodName = pod.Name
×
402
                                c.localNamespace = pod.Namespace
×
403
                                currentPod = &pod
×
404
                                break
×
405
                        }
406
                }
407
                if currentPod == nil {
×
408
                        klog.Warning("failed to get self pod")
×
409
                        return
×
410
                }
×
411
        } else {
×
412
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
413
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
414
                        return
×
415
                }
×
416
        }
417

418
        patch := util.KVPatch{}
×
419
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
420
                if errMsg == "" {
×
421
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
422
                } else {
×
423
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
424
                }
×
425
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
426
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
427
                        return
×
428
                }
×
429
        }
430
}
431

432
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
433
        patch := util.KVPatch{
×
434
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
435
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
436
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
437
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
438
        }
×
439
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
440
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
441
                return err
×
442
        }
×
443

444
        return c.ovsCleanProviderNetwork(pn.Name)
×
445
}
446

447
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
448
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
449
                klog.Error(err)
×
450
                return err
×
451
        }
×
452

453
        node, err := c.nodesLister.Get(c.config.NodeName)
×
454
        if err != nil {
×
455
                klog.Error(err)
×
456
                return err
×
457
        }
×
458
        if len(node.Labels) == 0 {
×
459
                return nil
×
460
        }
×
461

462
        patch := util.KVPatch{
×
463
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
464
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
465
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
466
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
467
        }
×
468
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
469
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
470
                return err
×
471
        }
×
472

473
        return nil
×
474
}
475

476
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
477
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
478
        newVlan := newObj.(*kubeovnv1.Vlan)
×
479
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
480
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
481
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
482
        }
×
483
}
484

485
type subnetEvent struct {
486
        oldObj, newObj any
487
}
488

489
type serviceEvent struct {
490
        oldObj, newObj any
491
}
492

493
func (c *Controller) enqueueAddSubnet(obj any) {
×
494
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
495
}
×
496

497
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
498
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
499
}
×
500

501
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
502
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
503
}
×
504

505
func (c *Controller) runSubnetWorker() {
×
506
        for c.processNextSubnetWorkItem() {
×
507
        }
×
508
}
509

510
func (c *Controller) enqueueAddService(obj any) {
×
511
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
512
}
×
513

514
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
515
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
516
}
×
517

518
func (c *Controller) enqueueDeleteService(obj any) {
×
519
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
520
}
×
521

522
func (c *Controller) runAddOrUpdateServicekWorker() {
×
523
        for c.processNextServiceWorkItem() {
×
524
        }
×
525
}
526

527
func (c *Controller) processNextSubnetWorkItem() bool {
×
528
        obj, shutdown := c.subnetQueue.Get()
×
529
        if shutdown {
×
530
                return false
×
531
        }
×
532

533
        err := func(obj *subnetEvent) error {
×
534
                defer c.subnetQueue.Done(obj)
×
535
                if err := c.reconcileRouters(obj); err != nil {
×
536
                        c.subnetQueue.AddRateLimited(obj)
×
537
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
538
                }
×
539
                c.subnetQueue.Forget(obj)
×
540
                return nil
×
541
        }(obj)
542
        if err != nil {
×
543
                utilruntime.HandleError(err)
×
544
                return true
×
545
        }
×
546
        return true
×
547
}
548

549
func (c *Controller) processNextServiceWorkItem() bool {
×
550
        obj, shutdown := c.serviceQueue.Get()
×
551
        if shutdown {
×
552
                return false
×
553
        }
×
554

555
        err := func(obj *serviceEvent) error {
×
556
                defer c.serviceQueue.Done(obj)
×
557
                if err := c.reconcileServices(obj); err != nil {
×
558
                        c.serviceQueue.AddRateLimited(obj)
×
559
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
560
                }
×
561
                c.serviceQueue.Forget(obj)
×
562
                return nil
×
563
        }(obj)
564
        if err != nil {
×
565
                utilruntime.HandleError(err)
×
566
                return true
×
567
        }
×
568
        return true
×
569
}
570

571
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
572
        oldPod := oldObj.(*v1.Pod)
×
573
        newPod := newObj.(*v1.Pod)
×
574
        key := cache.MetaObjectToName(newPod).String()
×
575

×
576
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
577
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
578
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
579
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
580
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
581
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
582
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
583
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
584
                c.updatePodQueue.Add(key)
×
585
                return
×
586
        }
×
587

588
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
589
        if err != nil {
×
590
                return
×
591
        }
×
592
        for _, multiNet := range attachNets {
×
593
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
594
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
595
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
596
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
597
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
598
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
599
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
600
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
601
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
602
                                c.updatePodQueue.Add(key)
×
603
                        }
×
604
                }
605
        }
606
}
607

608
func (c *Controller) enqueueDeletePod(obj any) {
×
609
        pod := obj.(*v1.Pod)
×
610
        key := cache.MetaObjectToName(pod).String()
×
611
        c.deletePodQueue.Add(key)
×
612
}
×
613

614
func (c *Controller) runUpdatePodWorker() {
×
615
        for c.processNextUpdatePodWorkItem() {
×
616
        }
×
617
}
618

619
func (c *Controller) runDeletePodWorker() {
×
620
        for c.processNextDeletePodWorkItem() {
×
621
        }
×
622
}
623

624
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
625
        key, shutdown := c.updatePodQueue.Get()
×
626
        if shutdown {
×
627
                return false
×
628
        }
×
629

630
        err := func(key string) error {
×
631
                defer c.updatePodQueue.Done(key)
×
632
                if err := c.handleUpdatePod(key); err != nil {
×
633
                        c.updatePodQueue.AddRateLimited(key)
×
634
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
635
                }
×
636
                c.updatePodQueue.Forget(key)
×
637
                return nil
×
638
        }(key)
639
        if err != nil {
×
640
                utilruntime.HandleError(err)
×
641
                return true
×
642
        }
×
643
        return true
×
644
}
645

646
func (c *Controller) processNextDeletePodWorkItem() bool {
×
647
        key, shutdown := c.deletePodQueue.Get()
×
648
        if shutdown {
×
649
                return false
×
650
        }
×
651

652
        err := func(key string) error {
×
653
                defer c.deletePodQueue.Done(key)
×
654
                if err := c.handleDeletePod(key); err != nil {
×
655
                        c.deletePodQueue.AddRateLimited(key)
×
656
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
657
                }
×
658
                c.deletePodQueue.Forget(key)
×
659
                return nil
×
660
        }(key)
661
        if err != nil {
×
662
                utilruntime.HandleError(err)
×
663
                return true
×
664
        }
×
665
        return true
×
666
}
667

668
var lastNoPodOvsPort map[string]bool
669

670
func (c *Controller) markAndCleanInternalPort() error {
×
671
        klog.V(4).Infof("start to gc ovs internal ports")
×
672
        residualPorts := ovs.GetResidualInternalPorts()
×
673
        if len(residualPorts) == 0 {
×
674
                return nil
×
675
        }
×
676

677
        noPodOvsPort := map[string]bool{}
×
678
        for _, portName := range residualPorts {
×
679
                if !lastNoPodOvsPort[portName] {
×
680
                        noPodOvsPort[portName] = true
×
681
                } else {
×
682
                        klog.Infof("gc ovs internal port %s", portName)
×
683
                        // Remove ovs port
×
684
                        output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName)
×
685
                        if err != nil {
×
686
                                return fmt.Errorf("failed to delete ovs port %w, %q", err, output)
×
687
                        }
×
688
                }
689
        }
690
        lastNoPodOvsPort = noPodOvsPort
×
691

×
692
        return nil
×
693
}
694

695
func (c *Controller) runIPSecWorker() {
×
696
        if err := c.StartIPSecService(); err != nil {
×
697
                klog.Errorf("starting ipsec service: %v", err)
×
698
        }
×
699

700
        for c.processNextIPSecWorkItem() {
×
701
        }
×
702
}
703

704
func (c *Controller) processNextIPSecWorkItem() bool {
×
705
        key, shutdown := c.ipsecQueue.Get()
×
706
        if shutdown {
×
707
                return false
×
708
        }
×
709
        defer c.ipsecQueue.Done(key)
×
710

×
711
        err := func(key string) error {
×
712
                if err := c.SyncIPSecKeys(key); err != nil {
×
713
                        c.ipsecQueue.AddRateLimited(key)
×
714
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
715
                }
×
716
                c.ipsecQueue.Forget(key)
×
717
                return nil
×
718
        }(key)
719
        if err != nil {
×
720
                utilruntime.HandleError(err)
×
721
                return true
×
722
        }
×
723
        return true
×
724
}
725

726
// Run starts controller
727
func (c *Controller) Run(stopCh <-chan struct{}) {
×
728
        defer utilruntime.HandleCrash()
×
729
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
730
        defer c.deleteProviderNetworkQueue.ShutDown()
×
731
        defer c.subnetQueue.ShutDown()
×
732
        defer c.serviceQueue.ShutDown()
×
733
        defer c.updatePodQueue.ShutDown()
×
734
        defer c.deletePodQueue.ShutDown()
×
735
        defer c.ipsecQueue.ShutDown()
×
736
        go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh)
×
737
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
738
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
739

×
740
        if err := c.setIPSet(); err != nil {
×
741
                util.LogFatalAndExit(err, "failed to set ipsets")
×
742
        }
×
743

744
        klog.Info("Started workers")
×
745
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
746
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
747
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
748
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
749
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
750
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
751
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
752
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
753
        go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
×
754
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
755
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
756
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
757
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
758
        go wait.Until(func() {
×
759
                if err := c.reconcileRouters(nil); err != nil {
×
760
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
761
                }
×
762
        }, 3*time.Second, stopCh)
763
        go wait.Until(func() {
×
764
                if err := c.markAndCleanInternalPort(); err != nil {
×
765
                        klog.Errorf("gc ovs port error: %v", err)
×
766
                }
×
767
        }, 5*time.Minute, stopCh)
768

769
        if c.config.EnableTProxy {
×
770
                go c.StartTProxyForwarding()
×
771
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
772
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
773
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
774
                // kubelet to tproxy, if probe success recover the iptable rules
×
775
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
776
        } else {
×
777
                c.cleanTProxyConfig()
×
778
        }
×
779

780
        if !c.config.EnableOVNIPSec {
×
781
                if err := c.StopAndClearIPSecResource(); err != nil {
×
782
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
783
                }
×
784
        }
785

786
        <-stopCh
×
787
        klog.Info("Shutting down workers")
×
788
}
789

790
func recompute() {
×
791
        output, err := exec.Command("ovn-appctl", "-t", "ovn-controller", "inc-engine/recompute").CombinedOutput()
×
792
        if err != nil {
×
793
                klog.Errorf("failed to recompute ovn-controller %q", output)
×
794
        }
×
795
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc