• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21198867681

21 Jan 2026 05:50AM UTC coverage: 22.721% (-0.2%) from 22.889%
21198867681

Pull #6153

github

zbb88888
fix: cni eip cache mem leak

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>
Pull Request #6153: feat(controller): allow SSH from node to Iptables eip

37 of 564 new or added lines in 6 files covered. (6.56%)

66 existing lines in 1 file now uncovered.

12336 of 54293 relevant lines covered (22.72%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        listerv1 "k8s.io/client-go/listers/core/v1"
26
        "k8s.io/client-go/tools/cache"
27
        "k8s.io/client-go/tools/record"
28
        "k8s.io/client-go/util/workqueue"
29
        "k8s.io/klog/v2"
30
        k8sexec "k8s.io/utils/exec"
31
        kubevirtv1 "kubevirt.io/api/core/v1"
32

33
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
34
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
35
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
36
        "github.com/kubeovn/kube-ovn/pkg/ovs"
37
        "github.com/kubeovn/kube-ovn/pkg/util"
38
)
39

40
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
41
type Controller struct {
42
        config *Configuration
43

44
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
45
        providerNetworksSynced          cache.InformerSynced
46
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
47
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
48

49
        vlansLister kubeovnlister.VlanLister
50
        vlansSynced cache.InformerSynced
51

52
        subnetsLister kubeovnlister.SubnetLister
53
        subnetsSynced cache.InformerSynced
54
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
55

56
        macvlanSubnetQueue workqueue.TypedRateLimitingInterface[*subnetEvent]
57

58
        ovnEipsLister kubeovnlister.OvnEipLister
59
        ovnEipsSynced cache.InformerSynced
60

61
        podsLister     listerv1.PodLister
62
        podsSynced     cache.InformerSynced
63
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
64

65
        nodesLister     listerv1.NodeLister
66
        nodesSynced     cache.InformerSynced
67
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
68

69
        servicesLister listerv1.ServiceLister
70
        servicesSynced cache.InformerSynced
71
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
72

73
        caSecretLister listerv1.SecretLister
74
        caSecretSynced cache.InformerSynced
75
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
76

77
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
78
        iptablesEipsSynced     cache.InformerSynced
79
        iptablesEipQueue       workqueue.TypedRateLimitingInterface[eipRouteInfo]
80
        iptablesEipDeleteQueue workqueue.TypedRateLimitingInterface[eipRouteInfo]
81

82
        // deletedEIPs tracks EIPs that have been deleted to prevent race conditions
83
        // between delete events and queued add events. Entries are cleaned up after
84
        // successful route deletion to prevent memory leaks.
85
        deletedEIPs sync.Map
86

87
        recorder record.EventRecorder
88

89
        protocol string
90

91
        ControllerRuntime
92

93
        k8sExec k8sexec.Interface
94
}
95

96
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
97
        if rateLimiter == nil {
×
98
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
99
        }
×
100
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
101
}
102

103
// NewController init a daemon controller
104
func NewController(config *Configuration,
105
        stopCh <-chan struct{},
106
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
107
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
108
) (*Controller, error) {
×
109
        eventBroadcaster := record.NewBroadcaster()
×
110
        eventBroadcaster.StartLogging(klog.Infof)
×
111
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
112
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
113
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
114
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
115
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
116
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
NEW
117
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
118
        podInformer := podInformerFactory.Core().V1().Pods()
×
119
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
120
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
121
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
122

×
123
        controller := &Controller{
×
124
                config: config,
×
125

×
126
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
127
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
128
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
129
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
130

×
131
                vlansLister: vlanInformer.Lister(),
×
132
                vlansSynced: vlanInformer.Informer().HasSynced,
×
133

×
134
                subnetsLister: subnetInformer.Lister(),
×
135
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
136
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
137

×
NEW
138
                macvlanSubnetQueue: newTypedRateLimitingQueue[*subnetEvent]("MacvlanSubnet", nil),
×
NEW
139

×
140
                ovnEipsLister: ovnEipInformer.Lister(),
×
141
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
142

×
143
                podsLister:     podInformer.Lister(),
×
144
                podsSynced:     podInformer.Informer().HasSynced,
×
145
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
146

×
147
                nodesLister:     nodeInformer.Lister(),
×
148
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
149
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
150

×
151
                servicesLister: servicesInformer.Lister(),
×
152
                servicesSynced: servicesInformer.Informer().HasSynced,
×
153
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
154

×
155
                caSecretLister: caSecretInformer.Lister(),
×
156
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
157
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
158

×
NEW
159
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
NEW
160
                iptablesEipsSynced:     iptablesEipInformer.Informer().HasSynced,
×
NEW
161
                iptablesEipQueue:       newTypedRateLimitingQueue[eipRouteInfo]("IptablesEip", nil),
×
NEW
162
                iptablesEipDeleteQueue: newTypedRateLimitingQueue[eipRouteInfo]("IptablesEipDelete", nil),
×
NEW
163

×
164
                recorder: recorder,
×
165
                k8sExec:  k8sexec.New(),
×
166
        }
×
167

×
168
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
169
        if err != nil {
×
170
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
171
        }
×
172
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
173

×
174
        if err = controller.initRuntime(); err != nil {
×
175
                return nil, err
×
176
        }
×
177

178
        podInformerFactory.Start(stopCh)
×
179
        nodeInformerFactory.Start(stopCh)
×
180
        kubeovnInformerFactory.Start(stopCh)
×
181
        caSecretInformerFactory.Start(stopCh)
×
182

×
183
        if !cache.WaitForCacheSync(stopCh,
×
184
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
NEW
185
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced,
×
NEW
186
                controller.iptablesEipsSynced) {
×
187
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
188
        }
×
189

190
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
191
                AddFunc:    controller.enqueueAddProviderNetwork,
×
192
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
193
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
194
        }); err != nil {
×
195
                return nil, err
×
196
        }
×
197
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
198
                UpdateFunc: controller.enqueueUpdateVlan,
×
199
        }); err != nil {
×
200
                return nil, err
×
201
        }
×
202
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
203
                AddFunc:    controller.enqueueAddSubnet,
×
204
                UpdateFunc: controller.enqueueUpdateSubnet,
×
205
                DeleteFunc: controller.enqueueDeleteSubnet,
×
206
        }); err != nil {
×
207
                return nil, err
×
208
        }
×
209
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
210
                AddFunc:    controller.enqueueAddService,
×
211
                DeleteFunc: controller.enqueueDeleteService,
×
212
                UpdateFunc: controller.enqueueUpdateService,
×
213
        }); err != nil {
×
214
                util.LogFatalAndExit(err, "failed to add service event handler")
×
215
        }
×
216

217
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
218
                UpdateFunc: controller.enqueueUpdatePod,
×
219
        }); err != nil {
×
220
                return nil, err
×
221
        }
×
222
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
223
                AddFunc:    controller.enqueueAddIPSecCA,
×
224
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
225
        }); err != nil {
×
226
                return nil, err
×
227
        }
×
228
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
229
                UpdateFunc: controller.enqueueUpdateNode,
×
230
        }); err != nil {
×
231
                return nil, err
×
232
        }
×
NEW
233
        if config.EnableNodeLocalAccessVpcNatGwEIP {
×
NEW
234
                if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
NEW
235
                        AddFunc:    controller.enqueueAddIptablesEip,
×
NEW
236
                        UpdateFunc: controller.enqueueUpdateIptablesEip,
×
NEW
237
                        DeleteFunc: controller.enqueueDeleteIptablesEip,
×
NEW
238
                }); err != nil {
×
NEW
239
                        return nil, err
×
NEW
240
                }
×
241
        }
242

243
        return controller, nil
×
244
}
245

246
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
247
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
248
        klog.V(3).Infof("enqueue add CA %s", key)
×
249
        c.ipsecQueue.Add(key)
×
250
}
×
251

252
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
253
        oldSecret := oldObj.(*v1.Secret)
×
254
        newSecret := newObj.(*v1.Secret)
×
255
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
256
                // No changes in CA data, no need to enqueue
×
257
                return
×
258
        }
×
259

260
        key := cache.MetaObjectToName(newSecret).String()
×
261
        klog.V(3).Infof("enqueue update CA %s", key)
×
262
        c.ipsecQueue.Add(key)
×
263
}
264

265
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
266
        oldNode := oldObj.(*v1.Node)
×
267
        newNode := newObj.(*v1.Node)
×
268
        if newNode.Name != c.config.NodeName {
×
269
                return
×
270
        }
×
271
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
272
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
273
                c.updateNodeQueue.Add(newNode.Name)
×
274
        }
×
275
}
276

277
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
278
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
279
        klog.V(3).Infof("enqueue add provider network %s", key)
×
280
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
281
}
×
282

283
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
284
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
285
        klog.V(3).Infof("enqueue update provider network %s", key)
×
286
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
287
}
×
288

289
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
290
        var pn *kubeovnv1.ProviderNetwork
×
291
        switch t := obj.(type) {
×
292
        case *kubeovnv1.ProviderNetwork:
×
293
                pn = t
×
294
        case cache.DeletedFinalStateUnknown:
×
295
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
296
                if !ok {
×
297
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
298
                        return
×
299
                }
×
300
                pn = p
×
301
        default:
×
302
                klog.Warningf("unexpected type: %T", obj)
×
303
                return
×
304
        }
305

306
        key := cache.MetaObjectToName(pn).String()
×
307
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
308
        c.deleteProviderNetworkQueue.Add(pn)
×
309
}
310

311
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
312
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
313
        }
×
314
}
315

316
func (c *Controller) runDeleteProviderNetworkWorker() {
×
317
        for c.processNextDeleteProviderNetworkWorkItem() {
×
318
        }
×
319
}
320

321
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
322
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
323
        if shutdown {
×
324
                return false
×
325
        }
×
326

327
        err := func(key string) error {
×
328
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
329
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
330
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
331
                }
×
332
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
333
                return nil
×
334
        }(key)
335
        if err != nil {
×
336
                utilruntime.HandleError(err)
×
337
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
338
                return true
×
339
        }
×
340
        return true
×
341
}
342

343
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
344
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
345
        if shutdown {
×
346
                return false
×
347
        }
×
348

349
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
350
                defer c.deleteProviderNetworkQueue.Done(obj)
×
351
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
352
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
353
                }
×
354
                c.deleteProviderNetworkQueue.Forget(obj)
×
355
                return nil
×
356
        }(obj)
357
        if err != nil {
×
358
                utilruntime.HandleError(err)
×
359
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
360
                return true
×
361
        }
×
362
        return true
×
363
}
364

365
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
366
        klog.V(3).Infof("handle update provider network %s", key)
×
367
        node, err := c.nodesLister.Get(c.config.NodeName)
×
368
        if err != nil {
×
369
                klog.Error(err)
×
370
                return err
×
371
        }
×
372
        pn, err := c.providerNetworksLister.Get(key)
×
373
        if err != nil {
×
374
                if k8serrors.IsNotFound(err) {
×
375
                        return nil
×
376
                }
×
377
                klog.Error(err)
×
378
                return err
×
379
        }
380

381
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
382
        if err != nil {
×
383
                klog.Error(err)
×
384
                return err
×
385
        }
×
386

387
        if excluded {
×
388
                c.recordProviderNetworkErr(pn.Name, "")
×
389
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
390
        }
×
391
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
392
}
393

394
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
395
        nic := pn.Spec.DefaultInterface
×
396
        for _, item := range pn.Spec.CustomInterfaces {
×
397
                if slices.Contains(item.Nodes, node.Name) {
×
398
                        nic = item.Interface
×
399
                        break
×
400
                }
401
        }
402

403
        patch := util.KVPatch{
×
404
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
405
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
406
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
407
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
408
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
409
        }
×
410

×
411
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
412
        for _, vlanName := range pn.Status.Vlans {
×
413
                vlan, err := c.vlansLister.Get(vlanName)
×
414
                if err != nil {
×
415
                        if k8serrors.IsNotFound(err) {
×
416
                                klog.Infof("vlan %s not found", vlanName)
×
417
                                continue
×
418
                        }
419
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
420
                        return err
×
421
                }
422
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
423
        }
424
        // always add trunk 0 so that the ovs bridge can communicate with the external network
425
        vlans.Add("0")
×
426

×
427
        // Auto-create VLAN subinterface if enabled and nic contains VLAN ID
×
428
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
429
                parts := strings.SplitN(nic, ".", 2)
×
430
                parentIf := parts[0]
×
431
                if !util.CheckInterfaceExists(nic) {
×
432
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
433
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
434
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
435
                                return err
×
436
                        }
×
437
                } else {
×
438
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
439
                }
×
440
        }
441

442
        // VLAN sub-interface handling - use map for efficiency
443
        vlanInterfaceMap := make(map[string]int) // interfaceName -> vlanID
×
444

×
445
        // Process explicitly specified VLAN interfaces
×
446
        if len(pn.Spec.VlanInterfaces) > 0 {
×
447
                klog.Infof("Processing %d explicitly specified VLAN interfaces", len(pn.Spec.VlanInterfaces))
×
448
                for _, vlanIfName := range pn.Spec.VlanInterfaces {
×
449
                        if util.CheckInterfaceExists(vlanIfName) {
×
450
                                // Extract VLAN ID from interface name (e.g., "eth0.10" -> 10)
×
451
                                vlanID, err := util.ExtractVlanIDFromInterface(vlanIfName)
×
452
                                if err != nil {
×
453
                                        klog.Warningf("Failed to extract VLAN ID from interface %s: %v", vlanIfName, err)
×
454
                                        continue
×
455
                                }
456
                                vlanInterfaceMap[vlanIfName] = vlanID
×
457
                                vlans.Add(strconv.Itoa(vlanID))
×
458
                                klog.V(3).Infof("Added explicit VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
459
                        } else {
×
460
                                klog.Warningf("Explicitly specified VLAN interface %s does not exist, skipping", vlanIfName)
×
461
                        }
×
462
                }
463
        }
464

465
        // Auto-detection of additional VLAN interfaces (if enabled)
466
        if pn.Spec.PreserveVlanInterfaces {
×
467
                klog.Infof("Auto-detecting VLAN interfaces on %s", nic)
×
468
                vlanIDs := util.DetectVlanInterfaces(nic)
×
469
                for _, vlanID := range vlanIDs {
×
470
                        vlanIfName := fmt.Sprintf("%s.%d", nic, vlanID)
×
471
                        // Only add if not already explicitly specified
×
472
                        if _, exists := vlanInterfaceMap[vlanIfName]; !exists {
×
473
                                vlanInterfaceMap[vlanIfName] = vlanID
×
474
                                vlans.Add(strconv.Itoa(vlanID))
×
475
                                klog.V(3).Infof("Auto-detected VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
476
                        } else {
×
477
                                klog.V(3).Infof("VLAN interface %s already explicitly specified, skipping auto-detection", vlanIfName)
×
478
                        }
×
479
                }
480
                klog.Infof("Auto-detected %d additional VLAN interfaces for %s", len(vlanIDs), nic)
×
481
        }
482

483
        var mtu int
×
484
        var err error
×
485
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
486
        // Configure main interface with ALL VLANs (including detected ones) in trunk
×
487
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback, vlanInterfaceMap); err != nil {
×
488
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
489
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
490
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
491
                }
×
492
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
493
                return err
×
494
        }
495

496
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
497
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
498
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
499
        if len(vlanInterfaceMap) > 0 {
×
500
                patch[fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name)] = "true"
×
501
        }
×
502
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
503
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
504
                return err
×
505
        }
×
506
        c.recordProviderNetworkErr(pn.Name, "")
×
507
        return nil
×
508
}
509

510
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
511
        pod, err := c.podsLister.Pods(c.config.PodNamespace).Get(c.config.PodName)
×
512
        if err != nil {
×
513
                klog.Errorf("failed to get pod %s/%s, %v", c.config.PodNamespace, c.config.PodName, err)
×
514
                return
×
515
        }
×
516

517
        patch := util.KVPatch{}
×
518
        if pod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
519
                if errMsg == "" {
×
520
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
521
                } else {
×
522
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
523
                }
×
524
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
525
                        klog.Errorf("failed to patch pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
526
                        return
×
527
                }
×
528
        }
529
}
530

531
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
532
        patch := util.KVPatch{
×
533
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
534
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
535
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
536
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
537
        }
×
538
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
539
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
540
                return err
×
541
        }
×
542

543
        return c.ovsCleanProviderNetwork(pn.Name)
×
544
}
545

546
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
547
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
548
                klog.Error(err)
×
549
                return err
×
550
        }
×
551

552
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name); err != nil {
×
553
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
554
                return err
×
555
        }
×
556

557
        node, err := c.nodesLister.Get(c.config.NodeName)
×
558
        if err != nil {
×
559
                klog.Error(err)
×
560
                return err
×
561
        }
×
562
        if len(node.Labels) == 0 {
×
563
                return nil
×
564
        }
×
565

566
        patch := util.KVPatch{
×
567
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
568
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
569
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
570
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
571
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
572
        }
×
573
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
574
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
575
                return err
×
576
        }
×
577

578
        return nil
×
579
}
580

581
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
582
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
583
        newVlan := newObj.(*kubeovnv1.Vlan)
×
584
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
585
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
586
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
587
        }
×
588
}
589

590
type subnetEvent struct {
591
        oldObj, newObj any
592
}
593

594
type serviceEvent struct {
595
        oldObj, newObj any
596
}
597

598
// subnetHasMacvlanMaster checks if a subnet has NadMacvlanMasterAnnotation set
NEW
599
func subnetHasMacvlanMaster(obj any) bool {
×
NEW
600
        if subnet, ok := obj.(*kubeovnv1.Subnet); ok {
×
NEW
601
                return subnet.Annotations[util.NadMacvlanMasterAnnotation] != ""
×
NEW
602
        }
×
NEW
603
        return false
×
604
}
605

606
func (c *Controller) enqueueAddSubnet(obj any) {
×
NEW
607
        event := &subnetEvent{newObj: obj}
×
NEW
608
        c.subnetQueue.Add(event)
×
NEW
609
        if c.config.EnableNodeLocalAccessVpcNatGwEIP && subnetHasMacvlanMaster(obj) {
×
NEW
610
                c.macvlanSubnetQueue.Add(event)
×
NEW
611
        }
×
612
}
613

UNCOV
614
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
NEW
UNCOV
615
        event := &subnetEvent{oldObj: oldObj, newObj: newObj}
×
NEW
616
        c.subnetQueue.Add(event)
×
NEW
617
        if c.config.EnableNodeLocalAccessVpcNatGwEIP && (subnetHasMacvlanMaster(oldObj) || subnetHasMacvlanMaster(newObj)) {
×
NEW
618
                c.macvlanSubnetQueue.Add(event)
×
NEW
619
        }
×
620
}
621

UNCOV
622
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
NEW
UNCOV
623
        event := &subnetEvent{oldObj: obj}
×
NEW
624
        c.subnetQueue.Add(event)
×
NEW
625
        if c.config.EnableNodeLocalAccessVpcNatGwEIP && subnetHasMacvlanMaster(obj) {
×
NEW
626
                c.macvlanSubnetQueue.Add(event)
×
NEW
627
        }
×
628
}
629

UNCOV
630
func (c *Controller) runSubnetWorker() {
×
UNCOV
631
        for c.processNextSubnetWorkItem() {
×
632
        }
×
633
}
634

NEW
UNCOV
635
func (c *Controller) runMacvlanSubnetWorker() {
×
NEW
UNCOV
636
        for c.processNextMacvlanSubnetItem() {
×
NEW
637
        }
×
638
}
639

NEW
640
func (c *Controller) processNextMacvlanSubnetItem() bool {
×
NEW
641
        obj, shutdown := c.macvlanSubnetQueue.Get()
×
NEW
642
        if shutdown {
×
NEW
643
                return false
×
NEW
644
        }
×
645

NEW
646
        err := func(obj *subnetEvent) error {
×
NEW
647
                defer c.macvlanSubnetQueue.Done(obj)
×
NEW
648
                if err := c.reconcileMacvlanSubnet(obj); err != nil {
×
NEW
649
                        c.macvlanSubnetQueue.AddRateLimited(obj)
×
NEW
650
                        return fmt.Errorf("error syncing macvlan for %v: %w, requeuing", obj, err)
×
NEW
651
                }
×
NEW
652
                c.macvlanSubnetQueue.Forget(obj)
×
NEW
653
                return nil
×
654
        }(obj)
NEW
655
        if err != nil {
×
NEW
656
                utilruntime.HandleError(err)
×
NEW
657
                return true
×
NEW
658
        }
×
NEW
659
        return true
×
660
}
661

662
func (c *Controller) enqueueAddService(obj any) {
×
663
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
664
}
×
665

666
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
UNCOV
667
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
668
}
×
669

670
func (c *Controller) enqueueDeleteService(obj any) {
×
UNCOV
671
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
672
}
×
673

674
func (c *Controller) runAddOrUpdateServicekWorker() {
×
UNCOV
675
        for c.processNextServiceWorkItem() {
×
676
        }
×
677
}
678

UNCOV
679
func (c *Controller) processNextSubnetWorkItem() bool {
×
UNCOV
680
        obj, shutdown := c.subnetQueue.Get()
×
681
        if shutdown {
×
682
                return false
×
683
        }
×
684

685
        err := func(obj *subnetEvent) error {
×
UNCOV
686
                defer c.subnetQueue.Done(obj)
×
687
                if err := c.reconcileRouters(obj); err != nil {
×
688
                        c.subnetQueue.AddRateLimited(obj)
×
689
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
690
                }
×
691
                c.subnetQueue.Forget(obj)
×
692
                return nil
×
693
        }(obj)
694
        if err != nil {
×
UNCOV
695
                utilruntime.HandleError(err)
×
696
                return true
×
697
        }
×
698
        return true
×
699
}
700

UNCOV
701
func (c *Controller) processNextServiceWorkItem() bool {
×
UNCOV
702
        obj, shutdown := c.serviceQueue.Get()
×
703
        if shutdown {
×
704
                return false
×
705
        }
×
706

707
        err := func(obj *serviceEvent) error {
×
UNCOV
708
                defer c.serviceQueue.Done(obj)
×
709
                if err := c.reconcileServices(obj); err != nil {
×
710
                        c.serviceQueue.AddRateLimited(obj)
×
711
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
712
                }
×
713
                c.serviceQueue.Forget(obj)
×
714
                return nil
×
715
        }(obj)
716
        if err != nil {
×
UNCOV
717
                utilruntime.HandleError(err)
×
718
                return true
×
719
        }
×
720
        return true
×
721
}
722

UNCOV
723
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
UNCOV
724
        oldPod := oldObj.(*v1.Pod)
×
725
        newPod := newObj.(*v1.Pod)
×
726
        key := cache.MetaObjectToName(newPod).String()
×
727

×
NEW
728
        // Handle NAT GW pod update for EIP route management
×
NEW
729
        if c.config.EnableNodeLocalAccessVpcNatGwEIP {
×
NEW
730
                c.handleNatGwPodUpdate(oldPod, newPod)
×
NEW
731
        }
×
732

733
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
734
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
735
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
736
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
737
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
738
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
739
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
740
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
741
                c.updatePodQueue.Add(key)
×
742
                return
×
743
        }
×
744

745
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
UNCOV
746
        if err != nil {
×
747
                return
×
748
        }
×
749
        for _, multiNet := range attachNets {
×
750
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
751
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
752
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
753
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
754
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
755
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
756
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
757
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
758
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
759
                                c.updatePodQueue.Add(key)
×
760
                        }
×
761
                }
762
        }
763
}
764

UNCOV
765
func (c *Controller) runUpdatePodWorker() {
×
UNCOV
766
        for c.processNextUpdatePodWorkItem() {
×
767
        }
×
768
}
769

UNCOV
770
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
UNCOV
771
        key, shutdown := c.updatePodQueue.Get()
×
772
        if shutdown {
×
773
                return false
×
774
        }
×
775

776
        err := func(key string) error {
×
UNCOV
777
                defer c.updatePodQueue.Done(key)
×
778
                if err := c.handleUpdatePod(key); err != nil {
×
779
                        c.updatePodQueue.AddRateLimited(key)
×
780
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
781
                }
×
782
                c.updatePodQueue.Forget(key)
×
783
                return nil
×
784
        }(key)
785
        if err != nil {
×
UNCOV
786
                utilruntime.HandleError(err)
×
787
                return true
×
788
        }
×
789
        return true
×
790
}
791

UNCOV
792
func (c *Controller) gcInterfaces() {
×
UNCOV
793
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
794
        if err != nil {
×
795
                klog.Errorf("failed to list interface pod map: %v", err)
×
796
                return
×
797
        }
×
798
        for iface, pod := range interfacePodMap {
×
799
                parts := strings.Split(pod, "/")
×
800
                if len(parts) < 3 {
×
801
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
802
                        continue
×
803
                }
804

UNCOV
805
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
UNCOV
806
                if strings.Contains(errText, "No such device") {
×
807
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
808
                        if err := ovs.CleanInterface(iface); err != nil {
×
809
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
810
                        }
×
811
                        continue
×
812
                }
813

UNCOV
814
                if _, err = c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
UNCOV
815
                        if !k8serrors.IsNotFound(err) {
×
816
                                klog.Errorf("failed to get pod %s/%s: %v", podNamespace, podName, err)
×
817
                                continue
×
818
                        }
819

820
                        // Pod not found by name. Check if this might be a KubeVirt VM.
821
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VM name (not the launcher pod name).
822
                        // The actual launcher pod has the label 'vm.kubevirt.io/name' with the VM name as value.
823
                        // Try to find launcher pods by this label.
UNCOV
824
                        selector := labels.SelectorFromSet(map[string]string{kubevirtv1.DeprecatedVirtualMachineNameLabel: podName})
×
UNCOV
825
                        launcherPods, err := c.podsLister.Pods(podNamespace).List(selector)
×
826
                        if err != nil {
×
827
                                klog.Errorf("failed to list launcher pods for vm %s/%s: %v", podNamespace, podName, err)
×
828
                                continue
×
829
                        }
830

831
                        // If we found launcher pod(s) for this VM, keep the interface
UNCOV
832
                        if len(launcherPods) > 0 {
×
UNCOV
833
                                klog.V(5).Infof("found %d launcher pod(s) for vm %s/%s, keeping ovs interface %s",
×
834
                                        len(launcherPods), podNamespace, podName, iface)
×
835
                                continue
×
836
                        }
837

838
                        // No pod on this node and no launcher pod found - safe to delete
UNCOV
839
                        klog.Infof("pod %s/%s not found on this node, delete ovs interface %s", podNamespace, podName, iface)
×
UNCOV
840
                        if err = ovs.CleanInterface(iface); err != nil {
×
841
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
842
                        }
×
843
                }
844
        }
845
}
846

UNCOV
847
func (c *Controller) runIPSecWorker() {
×
UNCOV
848
        if err := c.StartIPSecService(); err != nil {
×
849
                klog.Errorf("starting ipsec service: %v", err)
×
850
        }
×
851

852
        for c.processNextIPSecWorkItem() {
×
UNCOV
853
        }
×
854
}
855

UNCOV
856
func (c *Controller) processNextIPSecWorkItem() bool {
×
UNCOV
857
        key, shutdown := c.ipsecQueue.Get()
×
858
        if shutdown {
×
859
                return false
×
860
        }
×
861
        defer c.ipsecQueue.Done(key)
×
862

×
863
        err := func(key string) error {
×
864
                if err := c.SyncIPSecKeys(key); err != nil {
×
865
                        c.ipsecQueue.AddRateLimited(key)
×
866
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
867
                }
×
868
                c.ipsecQueue.Forget(key)
×
869
                return nil
×
870
        }(key)
871
        if err != nil {
×
UNCOV
872
                utilruntime.HandleError(err)
×
873
                return true
×
874
        }
×
875
        return true
×
876
}
877

UNCOV
878
func (c *Controller) runUpdateNodeWorker() {
×
UNCOV
879
        for c.processNextUpdateNodeWorkItem() {
×
880
        }
×
881
}
882

UNCOV
883
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
UNCOV
884
        key, shutdown := c.updateNodeQueue.Get()
×
885
        if shutdown {
×
886
                return false
×
887
        }
×
888

889
        err := func(key string) error {
×
UNCOV
890
                defer c.updateNodeQueue.Done(key)
×
891
                if err := c.handleUpdateNode(key); err != nil {
×
892
                        c.updateNodeQueue.AddRateLimited(key)
×
893
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
894
                }
×
895
                c.updateNodeQueue.Forget(key)
×
896
                return nil
×
897
        }(key)
898
        if err != nil {
×
UNCOV
899
                utilruntime.HandleError(err)
×
900
                return true
×
901
        }
×
902
        return true
×
903
}
904

UNCOV
905
func (c *Controller) handleUpdateNode(key string) error {
×
UNCOV
906
        node, err := c.nodesLister.Get(key)
×
907
        if err != nil {
×
908
                if k8serrors.IsNotFound(err) {
×
909
                        return nil
×
910
                }
×
911
                klog.Error(err)
×
912
                return err
×
913
        }
914

UNCOV
915
        klog.Infof("updating node networks for node %s", key)
×
UNCOV
916
        return c.config.UpdateNodeNetworks(node)
×
917
}
918

919
// Run starts controller
UNCOV
920
func (c *Controller) Run(stopCh <-chan struct{}) {
×
UNCOV
921
        defer utilruntime.HandleCrash()
×
922
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
923
        defer c.deleteProviderNetworkQueue.ShutDown()
×
924
        defer c.subnetQueue.ShutDown()
×
NEW
925
        defer c.macvlanSubnetQueue.ShutDown()
×
926
        defer c.serviceQueue.ShutDown()
×
927
        defer c.updatePodQueue.ShutDown()
×
928
        defer c.ipsecQueue.ShutDown()
×
929
        defer c.updateNodeQueue.ShutDown()
×
NEW
930
        defer c.iptablesEipQueue.ShutDown()
×
NEW
931
        defer c.iptablesEipDeleteQueue.ShutDown()
×
932
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
933
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
934
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
935

×
936
        if err := c.setIPSet(); err != nil {
×
937
                util.LogFatalAndExit(err, "failed to set ipsets")
×
938
        }
×
939

940
        klog.Info("Started workers")
×
UNCOV
941
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
942
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
943
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
944
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
945
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
946
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
947
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
948
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
949
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
950
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
NEW
951
        if c.config.EnableNodeLocalAccessVpcNatGwEIP {
×
NEW
952
                go wait.Until(c.runMacvlanSubnetWorker, time.Second, stopCh)
×
NEW
953
                go wait.Until(c.runIptablesEipWorker, time.Second, stopCh)
×
NEW
954
                go wait.Until(c.runIptablesEipDeleteWorker, time.Second, stopCh)
×
NEW
955
        }
×
956
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
957
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
958
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
959
        go wait.Until(func() {
×
960
                if err := c.reconcileRouters(nil); err != nil {
×
961
                        klog.Errorf("failed to reconcile %s routes: %v", util.NodeNic, err)
×
962
                }
×
963
        }, 3*time.Second, stopCh)
964

UNCOV
965
        if c.config.EnableTProxy {
×
UNCOV
966
                go c.StartTProxyForwarding()
×
967
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
968
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
969
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
970
                // kubelet to tproxy, if probe success recover the iptable rules
×
971
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
972
        } else {
×
973
                c.cleanTProxyConfig()
×
974
        }
×
975

976
        if !c.config.EnableOVNIPSec {
×
UNCOV
977
                if err := c.StopAndClearIPSecResource(); err != nil {
×
978
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
979
                }
×
980
        }
981

982
        // Start OpenFlow sync loop
UNCOV
983
        go c.runFlowSync(stopCh)
×
UNCOV
984

×
985
        <-stopCh
×
986
        klog.Info("Shutting down workers")
×
987
}
988

UNCOV
989
func recompute() {
×
UNCOV
990
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
991
        if err != nil {
×
992
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
993
        }
×
994
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc