• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 20424237756

22 Dec 2025 06:49AM UTC coverage: 22.538% (-0.06%) from 22.6%
20424237756

push

github

web-flow
add support for provider network vlan interfaces (#5949)

* add support for provider network vlan interfaces

Signed-off-by: abasitt <abdul.basit@rakuten.com>

* fix: add description for vlan interfaces

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

---------

Signed-off-by: abasitt <abdul.basit@rakuten.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Mengxin Liu <liumengxinfly@gmail.com>

10 of 282 new or added lines in 5 files covered. (3.55%)

5 existing lines in 3 files now uncovered.

12084 of 53617 relevant lines covered (22.54%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
14
        "github.com/scylladb/go-set/strset"
15
        v1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        listerv1 "k8s.io/client-go/listers/core/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        "k8s.io/klog/v2"
29
        k8sexec "k8s.io/utils/exec"
30
        kubevirtv1 "kubevirt.io/api/core/v1"
31

32
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
33
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
34
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
35
        "github.com/kubeovn/kube-ovn/pkg/ovs"
36
        "github.com/kubeovn/kube-ovn/pkg/util"
37
)
38

39
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
40
type Controller struct {
41
        config *Configuration
42

43
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
44
        providerNetworksSynced          cache.InformerSynced
45
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
46
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
47

48
        vlansLister kubeovnlister.VlanLister
49
        vlansSynced cache.InformerSynced
50

51
        subnetsLister kubeovnlister.SubnetLister
52
        subnetsSynced cache.InformerSynced
53
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
54

55
        ovnEipsLister kubeovnlister.OvnEipLister
56
        ovnEipsSynced cache.InformerSynced
57

58
        podsLister     listerv1.PodLister
59
        podsSynced     cache.InformerSynced
60
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
61
        deletePodQueue workqueue.TypedRateLimitingInterface[*podEvent]
62

63
        nodesLister     listerv1.NodeLister
64
        nodesSynced     cache.InformerSynced
65
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
66

67
        servicesLister listerv1.ServiceLister
68
        servicesSynced cache.InformerSynced
69
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
70

71
        caSecretLister listerv1.SecretLister
72
        caSecretSynced cache.InformerSynced
73
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
74

75
        recorder record.EventRecorder
76

77
        protocol string
78

79
        ControllerRuntime
80
        localPodName   string
81
        localNamespace string
82

83
        k8sExec k8sexec.Interface
84
}
85

86
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
87
        if rateLimiter == nil {
×
88
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
89
        }
×
90
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
91
}
92

93
// NewController init a daemon controller
94
func NewController(config *Configuration,
95
        stopCh <-chan struct{},
96
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
97
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
98
) (*Controller, error) {
×
99
        eventBroadcaster := record.NewBroadcaster()
×
100
        eventBroadcaster.StartLogging(klog.Infof)
×
101
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
102
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
103
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
104
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
105
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
106
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
107
        podInformer := podInformerFactory.Core().V1().Pods()
×
108
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
109
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
110
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
111

×
112
        controller := &Controller{
×
113
                config: config,
×
114

×
115
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
116
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
117
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
118
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
119

×
120
                vlansLister: vlanInformer.Lister(),
×
121
                vlansSynced: vlanInformer.Informer().HasSynced,
×
122

×
123
                subnetsLister: subnetInformer.Lister(),
×
124
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
125
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
126

×
127
                ovnEipsLister: ovnEipInformer.Lister(),
×
128
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
129

×
130
                podsLister:     podInformer.Lister(),
×
131
                podsSynced:     podInformer.Informer().HasSynced,
×
132
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
133
                deletePodQueue: newTypedRateLimitingQueue[*podEvent]("DeletePod", nil),
×
134

×
135
                nodesLister:     nodeInformer.Lister(),
×
136
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
137
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
138

×
139
                servicesLister: servicesInformer.Lister(),
×
140
                servicesSynced: servicesInformer.Informer().HasSynced,
×
141
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
142

×
143
                caSecretLister: caSecretInformer.Lister(),
×
144
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
145
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
146

×
147
                recorder: recorder,
×
148
                k8sExec:  k8sexec.New(),
×
149
        }
×
150

×
151
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
152
        if err != nil {
×
153
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
154
        }
×
155
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
156

×
157
        if err = controller.initRuntime(); err != nil {
×
158
                return nil, err
×
159
        }
×
160

161
        podInformerFactory.Start(stopCh)
×
162
        nodeInformerFactory.Start(stopCh)
×
163
        kubeovnInformerFactory.Start(stopCh)
×
164
        caSecretInformerFactory.Start(stopCh)
×
165

×
166
        if !cache.WaitForCacheSync(stopCh,
×
167
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
168
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
169
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
170
        }
×
171

172
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
173
                AddFunc:    controller.enqueueAddProviderNetwork,
×
174
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
175
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
176
        }); err != nil {
×
177
                return nil, err
×
178
        }
×
179
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
180
                UpdateFunc: controller.enqueueUpdateVlan,
×
181
        }); err != nil {
×
182
                return nil, err
×
183
        }
×
184
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
185
                AddFunc:    controller.enqueueAddSubnet,
×
186
                UpdateFunc: controller.enqueueUpdateSubnet,
×
187
                DeleteFunc: controller.enqueueDeleteSubnet,
×
188
        }); err != nil {
×
189
                return nil, err
×
190
        }
×
191
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
192
                AddFunc:    controller.enqueueAddService,
×
193
                DeleteFunc: controller.enqueueDeleteService,
×
194
                UpdateFunc: controller.enqueueUpdateService,
×
195
        }); err != nil {
×
196
                util.LogFatalAndExit(err, "failed to add service event handler")
×
197
        }
×
198

199
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
200
                UpdateFunc: controller.enqueueUpdatePod,
×
201
                DeleteFunc: controller.enqueueDeletePod,
×
202
        }); err != nil {
×
203
                return nil, err
×
204
        }
×
205
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
206
                AddFunc:    controller.enqueueAddIPSecCA,
×
207
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
208
        }); err != nil {
×
209
                return nil, err
×
210
        }
×
211
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
212
                UpdateFunc: controller.enqueueUpdateNode,
×
213
        }); err != nil {
×
214
                return nil, err
×
215
        }
×
216

217
        return controller, nil
×
218
}
219

220
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
221
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
222
        klog.V(3).Infof("enqueue add CA %s", key)
×
223
        c.ipsecQueue.Add(key)
×
224
}
×
225

226
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
227
        oldSecret := oldObj.(*v1.Secret)
×
228
        newSecret := newObj.(*v1.Secret)
×
229
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
230
                // No changes in CA data, no need to enqueue
×
231
                return
×
232
        }
×
233

234
        key := cache.MetaObjectToName(newSecret).String()
×
235
        klog.V(3).Infof("enqueue update CA %s", key)
×
236
        c.ipsecQueue.Add(key)
×
237
}
238

239
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
240
        oldNode := oldObj.(*v1.Node)
×
241
        newNode := newObj.(*v1.Node)
×
242
        if newNode.Name != c.config.NodeName {
×
243
                return
×
244
        }
×
245
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
246
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
247
                c.updateNodeQueue.Add(newNode.Name)
×
248
        }
×
249
}
250

251
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
252
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
253
        klog.V(3).Infof("enqueue add provider network %s", key)
×
254
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
255
}
×
256

257
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
258
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
259
        klog.V(3).Infof("enqueue update provider network %s", key)
×
260
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
261
}
×
262

263
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
264
        var pn *kubeovnv1.ProviderNetwork
×
265
        switch t := obj.(type) {
×
266
        case *kubeovnv1.ProviderNetwork:
×
267
                pn = t
×
268
        case cache.DeletedFinalStateUnknown:
×
269
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
270
                if !ok {
×
271
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
272
                        return
×
273
                }
×
274
                pn = p
×
275
        default:
×
276
                klog.Warningf("unexpected type: %T", obj)
×
277
                return
×
278
        }
279

280
        key := cache.MetaObjectToName(pn).String()
×
281
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
282
        c.deleteProviderNetworkQueue.Add(pn)
×
283
}
284

285
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
286
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
287
        }
×
288
}
289

290
func (c *Controller) runDeleteProviderNetworkWorker() {
×
291
        for c.processNextDeleteProviderNetworkWorkItem() {
×
292
        }
×
293
}
294

295
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
296
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
297
        if shutdown {
×
298
                return false
×
299
        }
×
300

301
        err := func(key string) error {
×
302
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
303
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
304
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
305
                }
×
306
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
307
                return nil
×
308
        }(key)
309
        if err != nil {
×
310
                utilruntime.HandleError(err)
×
311
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
312
                return true
×
313
        }
×
314
        return true
×
315
}
316

317
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
318
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
319
        if shutdown {
×
320
                return false
×
321
        }
×
322

323
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
324
                defer c.deleteProviderNetworkQueue.Done(obj)
×
325
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
326
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
327
                }
×
328
                c.deleteProviderNetworkQueue.Forget(obj)
×
329
                return nil
×
330
        }(obj)
331
        if err != nil {
×
332
                utilruntime.HandleError(err)
×
333
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
334
                return true
×
335
        }
×
336
        return true
×
337
}
338

339
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
340
        klog.V(3).Infof("handle update provider network %s", key)
×
341
        node, err := c.nodesLister.Get(c.config.NodeName)
×
342
        if err != nil {
×
343
                klog.Error(err)
×
344
                return err
×
345
        }
×
346
        pn, err := c.providerNetworksLister.Get(key)
×
347
        if err != nil {
×
348
                if k8serrors.IsNotFound(err) {
×
349
                        return nil
×
350
                }
×
351
                klog.Error(err)
×
352
                return err
×
353
        }
354

355
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
356
        if err != nil {
×
357
                klog.Error(err)
×
358
                return err
×
359
        }
×
360

361
        if excluded {
×
362
                c.recordProviderNetworkErr(pn.Name, "")
×
363
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
364
        }
×
365
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
366
}
367

368
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
369
        nic := pn.Spec.DefaultInterface
×
370
        for _, item := range pn.Spec.CustomInterfaces {
×
371
                if slices.Contains(item.Nodes, node.Name) {
×
372
                        nic = item.Interface
×
373
                        break
×
374
                }
375
        }
376

377
        patch := util.KVPatch{
×
378
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
379
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
380
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
381
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
NEW
382
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
383
        }
×
384

×
385
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
386
        for _, vlanName := range pn.Status.Vlans {
×
387
                vlan, err := c.vlansLister.Get(vlanName)
×
388
                if err != nil {
×
389
                        if k8serrors.IsNotFound(err) {
×
390
                                klog.Infof("vlan %s not found", vlanName)
×
391
                                continue
×
392
                        }
393
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
394
                        return err
×
395
                }
396
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
397
        }
398
        // always add trunk 0 so that the ovs bridge can communicate with the external network
399
        vlans.Add("0")
×
400

×
NEW
401
        // Auto-create VLAN subinterface if enabled and nic contains VLAN ID
×
402
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
403
                parts := strings.SplitN(nic, ".", 2)
×
404
                parentIf := parts[0]
×
405
                if !util.CheckInterfaceExists(nic) {
×
406
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
407
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
408
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
409
                                return err
×
410
                        }
×
411
                } else {
×
412
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
413
                }
×
414
        }
415

416
        // VLAN sub-interface handling - use map for efficiency
NEW
417
        vlanInterfaceMap := make(map[string]int) // interfaceName -> vlanID
×
NEW
418

×
NEW
419
        // Process explicitly specified VLAN interfaces
×
NEW
420
        if len(pn.Spec.VlanInterfaces) > 0 {
×
NEW
421
                klog.Infof("Processing %d explicitly specified VLAN interfaces", len(pn.Spec.VlanInterfaces))
×
NEW
422
                for _, vlanIfName := range pn.Spec.VlanInterfaces {
×
NEW
423
                        if util.CheckInterfaceExists(vlanIfName) {
×
NEW
424
                                // Extract VLAN ID from interface name (e.g., "eth0.10" -> 10)
×
NEW
425
                                vlanID, err := util.ExtractVlanIDFromInterface(vlanIfName)
×
NEW
426
                                if err != nil {
×
NEW
427
                                        klog.Warningf("Failed to extract VLAN ID from interface %s: %v", vlanIfName, err)
×
NEW
428
                                        continue
×
429
                                }
NEW
430
                                vlanInterfaceMap[vlanIfName] = vlanID
×
NEW
431
                                vlans.Add(strconv.Itoa(vlanID))
×
NEW
432
                                klog.V(3).Infof("Added explicit VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
NEW
433
                        } else {
×
NEW
434
                                klog.Warningf("Explicitly specified VLAN interface %s does not exist, skipping", vlanIfName)
×
NEW
435
                        }
×
436
                }
437
        }
438

439
        // Auto-detection of additional VLAN interfaces (if enabled)
NEW
440
        if pn.Spec.PreserveVlanInterfaces {
×
NEW
441
                klog.Infof("Auto-detecting VLAN interfaces on %s", nic)
×
NEW
442
                vlanIDs := util.DetectVlanInterfaces(nic)
×
NEW
443
                for _, vlanID := range vlanIDs {
×
NEW
444
                        vlanIfName := fmt.Sprintf("%s.%d", nic, vlanID)
×
NEW
445
                        // Only add if not already explicitly specified
×
NEW
446
                        if _, exists := vlanInterfaceMap[vlanIfName]; !exists {
×
NEW
447
                                vlanInterfaceMap[vlanIfName] = vlanID
×
NEW
448
                                vlans.Add(strconv.Itoa(vlanID))
×
NEW
449
                                klog.V(3).Infof("Auto-detected VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
NEW
450
                        } else {
×
NEW
451
                                klog.V(3).Infof("VLAN interface %s already explicitly specified, skipping auto-detection", vlanIfName)
×
NEW
452
                        }
×
453
                }
NEW
454
                klog.Infof("Auto-detected %d additional VLAN interfaces for %s", len(vlanIDs), nic)
×
455
        }
456

457
        var mtu int
×
458
        var err error
×
459
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
NEW
460
        // Configure main interface with ALL VLANs (including detected ones) in trunk
×
NEW
461
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback, vlanInterfaceMap); err != nil {
×
462
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
463
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
464
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
465
                }
×
466
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
467
                return err
×
468
        }
469

470
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
471
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
472
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
NEW
473
        if len(vlanInterfaceMap) > 0 {
×
NEW
474
                patch[fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name)] = "true"
×
NEW
475
        }
×
476
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
477
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
478
                return err
×
479
        }
×
480
        c.recordProviderNetworkErr(pn.Name, "")
×
481
        return nil
×
482
}
483

484
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
485
        var currentPod *v1.Pod
×
486
        var err error
×
487
        if c.localPodName == "" {
×
488
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
489
                        LabelSelector: "app=kube-ovn-cni",
×
490
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
491
                })
×
492
                if err != nil {
×
493
                        klog.Errorf("failed to list pod: %v", err)
×
494
                        return
×
495
                }
×
496
                for _, pod := range pods.Items {
×
497
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
498
                                c.localPodName = pod.Name
×
499
                                c.localNamespace = pod.Namespace
×
500
                                currentPod = &pod
×
501
                                break
×
502
                        }
503
                }
504
                if currentPod == nil {
×
505
                        klog.Warning("failed to get self pod")
×
506
                        return
×
507
                }
×
508
        } else {
×
509
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
510
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
511
                        return
×
512
                }
×
513
        }
514

515
        patch := util.KVPatch{}
×
516
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
517
                if errMsg == "" {
×
518
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
519
                } else {
×
520
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
521
                }
×
522
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
523
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
524
                        return
×
525
                }
×
526
        }
527
}
528

529
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
530
        patch := util.KVPatch{
×
531
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
532
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
533
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
534
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
535
        }
×
536
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
537
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
538
                return err
×
539
        }
×
540

541
        return c.ovsCleanProviderNetwork(pn.Name)
×
542
}
543

544
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
545
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
546
                klog.Error(err)
×
547
                return err
×
548
        }
×
549

550
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name); err != nil {
×
551
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
552
                return err
×
553
        }
×
554

555
        node, err := c.nodesLister.Get(c.config.NodeName)
×
556
        if err != nil {
×
557
                klog.Error(err)
×
558
                return err
×
559
        }
×
560
        if len(node.Labels) == 0 {
×
561
                return nil
×
562
        }
×
563

564
        patch := util.KVPatch{
×
565
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
566
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
567
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
568
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
NEW
569
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
570
        }
×
571
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
572
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
573
                return err
×
574
        }
×
575

576
        return nil
×
577
}
578

579
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
580
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
581
        newVlan := newObj.(*kubeovnv1.Vlan)
×
582
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
583
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
584
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
585
        }
×
586
}
587

588
type subnetEvent struct {
589
        oldObj, newObj any
590
}
591

592
type serviceEvent struct {
593
        oldObj, newObj any
594
}
595

596
type podEvent struct {
597
        oldObj any
598
}
599

600
func (c *Controller) enqueueAddSubnet(obj any) {
×
601
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
602
}
×
603

604
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
605
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
606
}
×
607

608
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
609
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
610
}
×
611

612
func (c *Controller) runSubnetWorker() {
×
613
        for c.processNextSubnetWorkItem() {
×
614
        }
×
615
}
616

617
func (c *Controller) enqueueAddService(obj any) {
×
618
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
619
}
×
620

621
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
622
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
623
}
×
624

625
func (c *Controller) enqueueDeleteService(obj any) {
×
626
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
627
}
×
628

629
func (c *Controller) runAddOrUpdateServicekWorker() {
×
630
        for c.processNextServiceWorkItem() {
×
631
        }
×
632
}
633

634
func (c *Controller) processNextSubnetWorkItem() bool {
×
635
        obj, shutdown := c.subnetQueue.Get()
×
636
        if shutdown {
×
637
                return false
×
638
        }
×
639

640
        err := func(obj *subnetEvent) error {
×
641
                defer c.subnetQueue.Done(obj)
×
642
                if err := c.reconcileRouters(obj); err != nil {
×
643
                        c.subnetQueue.AddRateLimited(obj)
×
644
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
645
                }
×
646
                c.subnetQueue.Forget(obj)
×
647
                return nil
×
648
        }(obj)
649
        if err != nil {
×
650
                utilruntime.HandleError(err)
×
651
                return true
×
652
        }
×
653
        return true
×
654
}
655

656
func (c *Controller) processNextServiceWorkItem() bool {
×
657
        obj, shutdown := c.serviceQueue.Get()
×
658
        if shutdown {
×
659
                return false
×
660
        }
×
661

662
        err := func(obj *serviceEvent) error {
×
663
                defer c.serviceQueue.Done(obj)
×
664
                if err := c.reconcileServices(obj); err != nil {
×
665
                        c.serviceQueue.AddRateLimited(obj)
×
666
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
667
                }
×
668
                c.serviceQueue.Forget(obj)
×
669
                return nil
×
670
        }(obj)
671
        if err != nil {
×
672
                utilruntime.HandleError(err)
×
673
                return true
×
674
        }
×
675
        return true
×
676
}
677

678
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
679
        oldPod := oldObj.(*v1.Pod)
×
680
        newPod := newObj.(*v1.Pod)
×
681
        key := cache.MetaObjectToName(newPod).String()
×
682

×
683
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
684
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
685
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
686
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
687
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
688
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
689
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
690
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
691
                c.updatePodQueue.Add(key)
×
692
                return
×
693
        }
×
694

695
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
696
        if err != nil {
×
697
                return
×
698
        }
×
699
        for _, multiNet := range attachNets {
×
700
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
701
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
702
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
703
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
704
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
705
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
706
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
707
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
708
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
709
                                c.updatePodQueue.Add(key)
×
710
                        }
×
711
                }
712
        }
713
}
714

715
func (c *Controller) enqueueDeletePod(obj any) {
×
716
        var pod *v1.Pod
×
717
        switch t := obj.(type) {
×
718
        case *v1.Pod:
×
719
                pod = t
×
720
        case cache.DeletedFinalStateUnknown:
×
721
                p, ok := t.Obj.(*v1.Pod)
×
722
                if !ok {
×
723
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
724
                        return
×
725
                }
×
726
                pod = p
×
727
        default:
×
728
                klog.Warningf("unexpected type: %T", obj)
×
729
                return
×
730
        }
731

732
        klog.V(3).Infof("enqueue delete pod %s", pod.Name)
×
733
        c.deletePodQueue.Add(&podEvent{oldObj: pod})
×
734
}
735

736
func (c *Controller) runUpdatePodWorker() {
×
737
        for c.processNextUpdatePodWorkItem() {
×
738
        }
×
739
}
740

741
func (c *Controller) runDeletePodWorker() {
×
742
        for c.processNextDeletePodWorkItem() {
×
743
        }
×
744
}
745

746
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
747
        key, shutdown := c.updatePodQueue.Get()
×
748
        if shutdown {
×
749
                return false
×
750
        }
×
751

752
        err := func(key string) error {
×
753
                defer c.updatePodQueue.Done(key)
×
754
                if err := c.handleUpdatePod(key); err != nil {
×
755
                        c.updatePodQueue.AddRateLimited(key)
×
756
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
757
                }
×
758
                c.updatePodQueue.Forget(key)
×
759
                return nil
×
760
        }(key)
761
        if err != nil {
×
762
                utilruntime.HandleError(err)
×
763
                return true
×
764
        }
×
765
        return true
×
766
}
767

768
func (c *Controller) processNextDeletePodWorkItem() bool {
×
769
        event, shutdown := c.deletePodQueue.Get()
×
770
        if shutdown {
×
771
                return false
×
772
        }
×
773

774
        err := func(event *podEvent) error {
×
775
                defer c.deletePodQueue.Done(event)
×
776
                if err := c.handleDeletePod(event); err != nil {
×
777
                        c.deletePodQueue.AddRateLimited(event)
×
778
                        return fmt.Errorf("error syncing pod event: %w, requeuing", err)
×
779
                }
×
780
                c.deletePodQueue.Forget(event)
×
781
                return nil
×
782
        }(event)
783
        if err != nil {
×
784
                utilruntime.HandleError(err)
×
785
                return true
×
786
        }
×
787
        return true
×
788
}
789

790
func (c *Controller) gcInterfaces() {
×
791
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
792
        if err != nil {
×
793
                klog.Errorf("failed to list interface pod map: %v", err)
×
794
                return
×
795
        }
×
796
        for iface, pod := range interfacePodMap {
×
797
                parts := strings.Split(pod, "/")
×
798
                if len(parts) < 3 {
×
799
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
800
                        continue
×
801
                }
802

803
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
804
                if strings.Contains(errText, "No such device") {
×
805
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
806
                        if err := ovs.CleanInterface(iface); err != nil {
×
807
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
808
                        }
×
809
                        continue
×
810
                }
811

812
                if podEntity, err := c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
813
                        // Pod not found by name. Check if this might be a KubeVirt VM.
×
814
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VM name (not the launcher pod name).
×
815
                        // The actual launcher pod has the label 'vm.kubevirt.io/name' with the VM name as value.
×
816
                        // Try to find launcher pods by this label.
×
817
                        if k8serrors.IsNotFound(err) {
×
818
                                selector := labels.SelectorFromSet(map[string]string{kubevirtv1.DeprecatedVirtualMachineNameLabel: podName})
×
819
                                launcherPods, listErr := c.podsLister.Pods(podNamespace).List(selector)
×
820
                                if listErr != nil {
×
821
                                        klog.Errorf("failed to list launcher pods for vm %s/%s: %v", podNamespace, podName, listErr)
×
822
                                        continue
×
823
                                }
824

825
                                // If we found launcher pod(s) for this VM, keep the interface
826
                                if len(launcherPods) > 0 {
×
827
                                        klog.V(5).Infof("found %d launcher pod(s) for vm %s/%s, keeping ovs interface %s",
×
828
                                                len(launcherPods), podNamespace, podName, iface)
×
829
                                        continue
×
830
                                }
831

832
                                // No pod and no launcher pod found - safe to delete
833
                                klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
834
                                if err := ovs.CleanInterface(iface); err != nil {
×
835
                                        klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
836
                                }
×
837
                        }
838
                } else {
×
839
                        // If the pod is found, compare the pod's node with the current cni node. If they differ, delete the interface.
×
840
                        if podEntity.Spec.NodeName != c.config.NodeName {
×
841
                                klog.Infof("pod %s/%s is on node %s, delete ovs interface %s on node %s ", podNamespace, podName, podEntity.Spec.NodeName, iface, c.config.NodeName)
×
842
                                if err := ovs.CleanInterface(iface); err != nil {
×
843
                                        klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
844
                                }
×
845
                        }
846
                }
847
        }
848
}
849

850
func (c *Controller) runIPSecWorker() {
×
851
        if err := c.StartIPSecService(); err != nil {
×
852
                klog.Errorf("starting ipsec service: %v", err)
×
853
        }
×
854

855
        for c.processNextIPSecWorkItem() {
×
856
        }
×
857
}
858

859
func (c *Controller) processNextIPSecWorkItem() bool {
×
860
        key, shutdown := c.ipsecQueue.Get()
×
861
        if shutdown {
×
862
                return false
×
863
        }
×
864
        defer c.ipsecQueue.Done(key)
×
865

×
866
        err := func(key string) error {
×
867
                if err := c.SyncIPSecKeys(key); err != nil {
×
868
                        c.ipsecQueue.AddRateLimited(key)
×
869
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
870
                }
×
871
                c.ipsecQueue.Forget(key)
×
872
                return nil
×
873
        }(key)
874
        if err != nil {
×
875
                utilruntime.HandleError(err)
×
876
                return true
×
877
        }
×
878
        return true
×
879
}
880

881
func (c *Controller) runUpdateNodeWorker() {
×
882
        for c.processNextUpdateNodeWorkItem() {
×
883
        }
×
884
}
885

886
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
887
        key, shutdown := c.updateNodeQueue.Get()
×
888
        if shutdown {
×
889
                return false
×
890
        }
×
891

892
        err := func(key string) error {
×
893
                defer c.updateNodeQueue.Done(key)
×
894
                if err := c.handleUpdateNode(key); err != nil {
×
895
                        c.updateNodeQueue.AddRateLimited(key)
×
896
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
897
                }
×
898
                c.updateNodeQueue.Forget(key)
×
899
                return nil
×
900
        }(key)
901
        if err != nil {
×
902
                utilruntime.HandleError(err)
×
903
                return true
×
904
        }
×
905
        return true
×
906
}
907

908
func (c *Controller) handleUpdateNode(key string) error {
×
909
        node, err := c.nodesLister.Get(key)
×
910
        if err != nil {
×
911
                if k8serrors.IsNotFound(err) {
×
912
                        return nil
×
913
                }
×
914
                klog.Error(err)
×
915
                return err
×
916
        }
917

918
        klog.Infof("updating node networks for node %s", key)
×
919
        return c.config.UpdateNodeNetworks(node)
×
920
}
921

922
// Run starts controller
923
func (c *Controller) Run(stopCh <-chan struct{}) {
×
924
        defer utilruntime.HandleCrash()
×
925
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
926
        defer c.deleteProviderNetworkQueue.ShutDown()
×
927
        defer c.subnetQueue.ShutDown()
×
928
        defer c.serviceQueue.ShutDown()
×
929
        defer c.updatePodQueue.ShutDown()
×
930
        defer c.deletePodQueue.ShutDown()
×
931
        defer c.ipsecQueue.ShutDown()
×
932
        defer c.updateNodeQueue.ShutDown()
×
933
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
934
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
935
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
936

×
937
        if err := c.setIPSet(); err != nil {
×
938
                util.LogFatalAndExit(err, "failed to set ipsets")
×
939
        }
×
940

941
        klog.Info("Started workers")
×
942
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
943
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
944
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
945
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
946
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
947
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
948
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
949
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
950
        go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
×
951
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
952
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
953
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
954
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
955
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
956
        go wait.Until(func() {
×
957
                if err := c.reconcileRouters(nil); err != nil {
×
958
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
959
                }
×
960
        }, 3*time.Second, stopCh)
961

962
        if c.config.EnableTProxy {
×
963
                go c.StartTProxyForwarding()
×
964
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
965
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
966
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
967
                // kubelet to tproxy, if probe success recover the iptable rules
×
968
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
969
        } else {
×
970
                c.cleanTProxyConfig()
×
971
        }
×
972

973
        if !c.config.EnableOVNIPSec {
×
974
                if err := c.StopAndClearIPSecResource(); err != nil {
×
975
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
976
                }
×
977
        }
978

979
        <-stopCh
×
980
        klog.Info("Shutting down workers")
×
981
}
982

983
func recompute() {
×
984
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
985
        if err != nil {
×
986
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
987
        }
×
988
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc