• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 22657101833

04 Mar 2026 05:55AM UTC coverage: 23.165% (-0.005%) from 23.17%
22657101833

push

github

web-flow
fix: add retry logic to checkLeader() and checkDaemonSet() in kubectl-ko (#6387)

checkLeader() could fail when OVN SB EndpointSlice endpoints field is
transiently nil due to readiness probe brief failures, causing
`ko diagnose IPPorts` to exit immediately. Add retry logic (10 attempts,
1s interval) consistent with the existing getLeaderPod() pattern.

Also add retry logic to checkDaemonSet() (30 attempts, 1s interval)
to match the existing checkDeployment() pattern for consistency.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

12616 of 54461 relevant lines covered (23.17%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        listerv1 "k8s.io/client-go/listers/core/v1"
26
        "k8s.io/client-go/tools/cache"
27
        "k8s.io/client-go/tools/record"
28
        "k8s.io/client-go/util/workqueue"
29
        "k8s.io/klog/v2"
30
        k8sexec "k8s.io/utils/exec"
31
        kubevirtv1 "kubevirt.io/api/core/v1"
32

33
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
34
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
35
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
36
        "github.com/kubeovn/kube-ovn/pkg/ovs"
37
        "github.com/kubeovn/kube-ovn/pkg/util"
38
)
39

40
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
41
type Controller struct {
42
        config *Configuration
43

44
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
45
        providerNetworksSynced          cache.InformerSynced
46
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
47
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
48

49
        vlansLister kubeovnlister.VlanLister
50
        vlansSynced cache.InformerSynced
51

52
        subnetsLister kubeovnlister.SubnetLister
53
        subnetsSynced cache.InformerSynced
54
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
55

56
        ovnEipsLister kubeovnlister.OvnEipLister
57
        ovnEipsSynced cache.InformerSynced
58

59
        podsLister     listerv1.PodLister
60
        podsSynced     cache.InformerSynced
61
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
62

63
        nodesLister     listerv1.NodeLister
64
        nodesSynced     cache.InformerSynced
65
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
66

67
        servicesLister listerv1.ServiceLister
68
        servicesSynced cache.InformerSynced
69
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
70

71
        caSecretLister listerv1.SecretLister
72
        caSecretSynced cache.InformerSynced
73
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
74

75
        recorder record.EventRecorder
76

77
        protocol string
78

79
        ControllerRuntime
80

81
        k8sExec k8sexec.Interface
82

83
        ipsecServiceStarted sync.Once
84

85
        // channel used for fdb sync
86
        fdbSyncChan   chan struct{}
87
        fdbSyncMutex  sync.Mutex
88
        vswitchClient ovs.Vswitch
89
}
90

91
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
92
        if rateLimiter == nil {
×
93
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
94
        }
×
95
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
96
}
97

98
// NewController init a daemon controller
99
func NewController(config *Configuration,
100
        stopCh <-chan struct{},
101
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
102
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
103
) (*Controller, error) {
×
104
        eventBroadcaster := record.NewBroadcaster()
×
105
        eventBroadcaster.StartLogging(klog.Infof)
×
106
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
107
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
108
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
109
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
110
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
111
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
112
        podInformer := podInformerFactory.Core().V1().Pods()
×
113
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
114
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
115
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
116

×
117
        controller := &Controller{
×
118
                config: config,
×
119

×
120
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
121
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
122
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
123
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
124

×
125
                vlansLister: vlanInformer.Lister(),
×
126
                vlansSynced: vlanInformer.Informer().HasSynced,
×
127

×
128
                subnetsLister: subnetInformer.Lister(),
×
129
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
130
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
131

×
132
                ovnEipsLister: ovnEipInformer.Lister(),
×
133
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
134

×
135
                podsLister:     podInformer.Lister(),
×
136
                podsSynced:     podInformer.Informer().HasSynced,
×
137
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
138

×
139
                nodesLister:     nodeInformer.Lister(),
×
140
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
141
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
142

×
143
                servicesLister: servicesInformer.Lister(),
×
144
                servicesSynced: servicesInformer.Informer().HasSynced,
×
145
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
146

×
147
                caSecretLister: caSecretInformer.Lister(),
×
148
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
149
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
150

×
151
                recorder: recorder,
×
152
                k8sExec:  k8sexec.New(),
×
153

×
154
                fdbSyncChan: make(chan struct{}, 1),
×
155
        }
×
156

×
157
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
158
        if err != nil {
×
159
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
160
        }
×
161
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
162

×
163
        if err = controller.initRuntime(); err != nil {
×
164
                return nil, err
×
165
        }
×
166

167
        podInformerFactory.Start(stopCh)
×
168
        nodeInformerFactory.Start(stopCh)
×
169
        kubeovnInformerFactory.Start(stopCh)
×
170
        caSecretInformerFactory.Start(stopCh)
×
171

×
172
        if !cache.WaitForCacheSync(stopCh,
×
173
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
174
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
175
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
176
        }
×
177

178
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
179
                AddFunc:    controller.enqueueAddProviderNetwork,
×
180
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
181
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
182
        }); err != nil {
×
183
                return nil, err
×
184
        }
×
185
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
186
                UpdateFunc: controller.enqueueUpdateVlan,
×
187
        }); err != nil {
×
188
                return nil, err
×
189
        }
×
190
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
191
                AddFunc:    controller.enqueueAddSubnet,
×
192
                UpdateFunc: controller.enqueueUpdateSubnet,
×
193
                DeleteFunc: controller.enqueueDeleteSubnet,
×
194
        }); err != nil {
×
195
                return nil, err
×
196
        }
×
197
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
198
                AddFunc:    controller.enqueueAddService,
×
199
                DeleteFunc: controller.enqueueDeleteService,
×
200
                UpdateFunc: controller.enqueueUpdateService,
×
201
        }); err != nil {
×
202
                util.LogFatalAndExit(err, "failed to add service event handler")
×
203
        }
×
204

205
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
206
                UpdateFunc: controller.enqueueUpdatePod,
×
207
        }); err != nil {
×
208
                return nil, err
×
209
        }
×
210
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
211
                AddFunc:    controller.enqueueAddIPSecCA,
×
212
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
213
        }); err != nil {
×
214
                return nil, err
×
215
        }
×
216
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
217
                UpdateFunc: controller.enqueueUpdateNode,
×
218
        }); err != nil {
×
219
                return nil, err
×
220
        }
×
221

222
        if controller.vswitchClient, err = ovs.NewVswitchClient("unix:/var/run/openvswitch/db.sock", 1, 3); err != nil {
×
223
                return nil, fmt.Errorf("failed to create vswitch client: %w", err)
×
224
        }
×
225

226
        return controller, nil
×
227
}
228

229
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
230
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
231
        klog.V(3).Infof("enqueue add CA %s", key)
×
232
        c.ipsecQueue.Add(key)
×
233
}
×
234

235
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
236
        oldSecret := oldObj.(*v1.Secret)
×
237
        newSecret := newObj.(*v1.Secret)
×
238
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
239
                // No changes in CA data, no need to enqueue
×
240
                return
×
241
        }
×
242

243
        key := cache.MetaObjectToName(newSecret).String()
×
244
        klog.V(3).Infof("enqueue update CA %s", key)
×
245
        c.ipsecQueue.Add(key)
×
246
}
247

248
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
249
        oldNode := oldObj.(*v1.Node)
×
250
        newNode := newObj.(*v1.Node)
×
251
        if newNode.Name != c.config.NodeName {
×
252
                return
×
253
        }
×
254
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
255
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
256
                c.updateNodeQueue.Add(newNode.Name)
×
257
        }
×
258
}
259

260
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
261
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
262
        klog.V(3).Infof("enqueue add provider network %s", key)
×
263
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
264
}
×
265

266
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
267
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
268
        klog.V(3).Infof("enqueue update provider network %s", key)
×
269
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
270
}
×
271

272
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
273
        var pn *kubeovnv1.ProviderNetwork
×
274
        switch t := obj.(type) {
×
275
        case *kubeovnv1.ProviderNetwork:
×
276
                pn = t
×
277
        case cache.DeletedFinalStateUnknown:
×
278
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
279
                if !ok {
×
280
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
281
                        return
×
282
                }
×
283
                pn = p
×
284
        default:
×
285
                klog.Warningf("unexpected type: %T", obj)
×
286
                return
×
287
        }
288

289
        key := cache.MetaObjectToName(pn).String()
×
290
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
291
        c.deleteProviderNetworkQueue.Add(pn)
×
292
}
293

294
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
295
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
296
        }
×
297
}
298

299
func (c *Controller) runDeleteProviderNetworkWorker() {
×
300
        for c.processNextDeleteProviderNetworkWorkItem() {
×
301
        }
×
302
}
303

304
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
305
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
306
        if shutdown {
×
307
                return false
×
308
        }
×
309

310
        err := func(key string) error {
×
311
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
312
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
313
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
314
                }
×
315
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
316
                return nil
×
317
        }(key)
318
        if err != nil {
×
319
                utilruntime.HandleError(err)
×
320
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
321
                return true
×
322
        }
×
323
        return true
×
324
}
325

326
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
327
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
328
        if shutdown {
×
329
                return false
×
330
        }
×
331

332
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
333
                defer c.deleteProviderNetworkQueue.Done(obj)
×
334
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
335
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
336
                }
×
337
                c.deleteProviderNetworkQueue.Forget(obj)
×
338
                return nil
×
339
        }(obj)
340
        if err != nil {
×
341
                utilruntime.HandleError(err)
×
342
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
343
                return true
×
344
        }
×
345
        return true
×
346
}
347

348
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
349
        klog.V(3).Infof("handle update provider network %s", key)
×
350
        node, err := c.nodesLister.Get(c.config.NodeName)
×
351
        if err != nil {
×
352
                klog.Error(err)
×
353
                return err
×
354
        }
×
355
        pn, err := c.providerNetworksLister.Get(key)
×
356
        if err != nil {
×
357
                if k8serrors.IsNotFound(err) {
×
358
                        return nil
×
359
                }
×
360
                klog.Error(err)
×
361
                return err
×
362
        }
363

364
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
365
        if err != nil {
×
366
                klog.Error(err)
×
367
                return err
×
368
        }
×
369

370
        if excluded {
×
371
                c.recordProviderNetworkErr(pn.Name, "")
×
372
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
373
        }
×
374
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
375
}
376

377
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
378
        nic := pn.Spec.DefaultInterface
×
379
        for _, item := range pn.Spec.CustomInterfaces {
×
380
                if slices.Contains(item.Nodes, node.Name) {
×
381
                        nic = item.Interface
×
382
                        break
×
383
                }
384
        }
385

386
        patch := util.KVPatch{
×
387
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
388
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
389
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
390
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
391
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
392
        }
×
393

×
394
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
395
        for _, vlanName := range pn.Status.Vlans {
×
396
                vlan, err := c.vlansLister.Get(vlanName)
×
397
                if err != nil {
×
398
                        if k8serrors.IsNotFound(err) {
×
399
                                klog.Infof("vlan %s not found", vlanName)
×
400
                                continue
×
401
                        }
402
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
403
                        return err
×
404
                }
405
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
406
        }
407
        // always add trunk 0 so that the ovs bridge can communicate with the external network
408
        vlans.Add("0")
×
409

×
410
        // Auto-create VLAN subinterface if enabled and nic contains VLAN ID
×
411
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
412
                parts := strings.SplitN(nic, ".", 2)
×
413
                parentIf := parts[0]
×
414
                if !util.CheckInterfaceExists(nic) {
×
415
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
416
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
417
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
418
                                return err
×
419
                        }
×
420
                } else {
×
421
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
422
                }
×
423
        }
424

425
        // VLAN sub-interface handling - use map for efficiency
426
        vlanInterfaceMap := make(map[string]int) // interfaceName -> vlanID
×
427

×
428
        // Process explicitly specified VLAN interfaces
×
429
        if len(pn.Spec.VlanInterfaces) > 0 {
×
430
                klog.Infof("Processing %d explicitly specified VLAN interfaces", len(pn.Spec.VlanInterfaces))
×
431
                for _, vlanIfName := range pn.Spec.VlanInterfaces {
×
432
                        if util.CheckInterfaceExists(vlanIfName) {
×
433
                                // Extract VLAN ID from interface name (e.g., "eth0.10" -> 10)
×
434
                                vlanID, err := util.ExtractVlanIDFromInterface(vlanIfName)
×
435
                                if err != nil {
×
436
                                        klog.Warningf("Failed to extract VLAN ID from interface %s: %v", vlanIfName, err)
×
437
                                        continue
×
438
                                }
439
                                vlanInterfaceMap[vlanIfName] = vlanID
×
440
                                vlans.Add(strconv.Itoa(vlanID))
×
441
                                klog.V(3).Infof("Added explicit VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
442
                        } else {
×
443
                                klog.Warningf("Explicitly specified VLAN interface %s does not exist, skipping", vlanIfName)
×
444
                        }
×
445
                }
446
        }
447

448
        // Auto-detection of additional VLAN interfaces (if enabled)
449
        if pn.Spec.PreserveVlanInterfaces {
×
450
                klog.Infof("Auto-detecting VLAN interfaces on %s", nic)
×
451
                vlanIDs := util.DetectVlanInterfaces(nic)
×
452
                for _, vlanID := range vlanIDs {
×
453
                        vlanIfName := fmt.Sprintf("%s.%d", nic, vlanID)
×
454
                        // Only add if not already explicitly specified
×
455
                        if _, exists := vlanInterfaceMap[vlanIfName]; !exists {
×
456
                                vlanInterfaceMap[vlanIfName] = vlanID
×
457
                                vlans.Add(strconv.Itoa(vlanID))
×
458
                                klog.V(3).Infof("Auto-detected VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
459
                        } else {
×
460
                                klog.V(3).Infof("VLAN interface %s already explicitly specified, skipping auto-detection", vlanIfName)
×
461
                        }
×
462
                }
463
                klog.Infof("Auto-detected %d additional VLAN interfaces for %s", len(vlanIDs), nic)
×
464
        }
465

466
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, nic, vlanInterfaceMap); err != nil {
×
467
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
468
                return err
×
469
        }
×
470

471
        var mtu int
×
472
        var err error
×
473
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
474
        // Configure main interface with ALL VLANs (including detected ones) in trunk
×
475
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback, vlanInterfaceMap); err != nil {
×
476
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
477
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
478
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
479
                }
×
480
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
481
                return err
×
482
        }
483

484
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
485
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
486
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
487
        if len(vlanInterfaceMap) > 0 {
×
488
                patch[fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name)] = "true"
×
489
        }
×
490
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
491
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
492
                return err
×
493
        }
×
494
        c.recordProviderNetworkErr(pn.Name, "")
×
495
        return nil
×
496
}
497

498
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
499
        pod, err := c.podsLister.Pods(c.config.PodNamespace).Get(c.config.PodName)
×
500
        if err != nil {
×
501
                klog.Errorf("failed to get pod %s/%s, %v", c.config.PodNamespace, c.config.PodName, err)
×
502
                return
×
503
        }
×
504

505
        patch := util.KVPatch{}
×
506
        if pod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
507
                if errMsg == "" {
×
508
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
509
                } else {
×
510
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
511
                }
×
512
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
513
                        klog.Errorf("failed to patch pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
514
                        return
×
515
                }
×
516
        }
517
}
518

519
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
520
        patch := util.KVPatch{
×
521
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
522
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
523
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
524
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
525
        }
×
526
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
527
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
528
                return err
×
529
        }
×
530

531
        return c.ovsCleanProviderNetwork(pn.Name)
×
532
}
533

534
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
535
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
536
                klog.Error(err)
×
537
                return err
×
538
        }
×
539

540
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, "", nil); err != nil {
×
541
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
542
                return err
×
543
        }
×
544

545
        node, err := c.nodesLister.Get(c.config.NodeName)
×
546
        if err != nil {
×
547
                klog.Error(err)
×
548
                return err
×
549
        }
×
550
        if len(node.Labels) == 0 {
×
551
                return nil
×
552
        }
×
553

554
        patch := util.KVPatch{
×
555
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
556
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
557
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
558
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
559
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
560
        }
×
561
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
562
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
563
                return err
×
564
        }
×
565

566
        return nil
×
567
}
568

569
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
570
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
571
        newVlan := newObj.(*kubeovnv1.Vlan)
×
572
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
573
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
574
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
575
        }
×
576
}
577

578
type subnetEvent struct {
579
        oldObj, newObj any
580
}
581

582
type serviceEvent struct {
583
        oldObj, newObj any
584
}
585

586
func (c *Controller) enqueueAddSubnet(obj any) {
×
587
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
588
}
×
589

590
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
591
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
592
}
×
593

594
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
595
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
596
}
×
597

598
func (c *Controller) runSubnetWorker() {
×
599
        for c.processNextSubnetWorkItem() {
×
600
        }
×
601
}
602

603
func (c *Controller) enqueueAddService(obj any) {
×
604
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
605
}
×
606

607
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
608
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
609
}
×
610

611
func (c *Controller) enqueueDeleteService(obj any) {
×
612
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
613
}
×
614

615
func (c *Controller) runAddOrUpdateServiceWorker() {
×
616
        for c.processNextServiceWorkItem() {
×
617
        }
×
618
}
619

620
func (c *Controller) processNextSubnetWorkItem() bool {
×
621
        obj, shutdown := c.subnetQueue.Get()
×
622
        if shutdown {
×
623
                return false
×
624
        }
×
625

626
        err := func(obj *subnetEvent) error {
×
627
                defer c.subnetQueue.Done(obj)
×
628
                c.requestFdbSync()
×
629
                if err := c.reconcileRouters(obj); err != nil {
×
630
                        c.subnetQueue.AddRateLimited(obj)
×
631
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
632
                }
×
633
                c.subnetQueue.Forget(obj)
×
634
                return nil
×
635
        }(obj)
636
        if err != nil {
×
637
                utilruntime.HandleError(err)
×
638
                return true
×
639
        }
×
640
        return true
×
641
}
642

643
func (c *Controller) processNextServiceWorkItem() bool {
×
644
        obj, shutdown := c.serviceQueue.Get()
×
645
        if shutdown {
×
646
                return false
×
647
        }
×
648

649
        err := func(obj *serviceEvent) error {
×
650
                defer c.serviceQueue.Done(obj)
×
651
                if err := c.reconcileServices(obj); err != nil {
×
652
                        c.serviceQueue.AddRateLimited(obj)
×
653
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
654
                }
×
655
                c.serviceQueue.Forget(obj)
×
656
                return nil
×
657
        }(obj)
658
        if err != nil {
×
659
                utilruntime.HandleError(err)
×
660
                return true
×
661
        }
×
662
        return true
×
663
}
664

665
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
666
        oldPod := oldObj.(*v1.Pod)
×
667
        newPod := newObj.(*v1.Pod)
×
668
        key := cache.MetaObjectToName(newPod).String()
×
669

×
670
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
671
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
672
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
673
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
674
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
675
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
676
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
677
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
678
                c.updatePodQueue.Add(key)
×
679
                return
×
680
        }
×
681

682
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
683
        if err != nil {
×
684
                return
×
685
        }
×
686
        for _, multiNet := range attachNets {
×
687
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
688
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
689
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
690
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
691
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
692
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
693
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
694
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
695
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
696
                                c.updatePodQueue.Add(key)
×
697
                        }
×
698
                }
699
        }
700
}
701

702
func (c *Controller) runUpdatePodWorker() {
×
703
        for c.processNextUpdatePodWorkItem() {
×
704
        }
×
705
}
706

707
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
708
        key, shutdown := c.updatePodQueue.Get()
×
709
        if shutdown {
×
710
                return false
×
711
        }
×
712

713
        err := func(key string) error {
×
714
                defer c.updatePodQueue.Done(key)
×
715
                if err := c.handleUpdatePod(key); err != nil {
×
716
                        c.updatePodQueue.AddRateLimited(key)
×
717
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
718
                }
×
719
                c.updatePodQueue.Forget(key)
×
720
                return nil
×
721
        }(key)
722
        if err != nil {
×
723
                utilruntime.HandleError(err)
×
724
                return true
×
725
        }
×
726
        return true
×
727
}
728

729
func (c *Controller) gcInterfaces() {
×
730
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
731
        if err != nil {
×
732
                klog.Errorf("failed to list interface pod map: %v", err)
×
733
                return
×
734
        }
×
735
        for iface, pod := range interfacePodMap {
×
736
                parts := strings.Split(pod, "/")
×
737
                if len(parts) < 3 {
×
738
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
739
                        continue
×
740
                }
741

742
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
743
                if strings.Contains(errText, "No such device") {
×
744
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
745
                        if err := ovs.CleanInterface(iface); err != nil {
×
746
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
747
                        }
×
748
                        continue
×
749
                }
750

751
                if _, err = c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
752
                        if !k8serrors.IsNotFound(err) {
×
753
                                klog.Errorf("failed to get pod %s/%s: %v", podNamespace, podName, err)
×
754
                                continue
×
755
                        }
756

757
                        // Pod not found by name. Check if this might be a KubeVirt VM.
758
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VM name (not the launcher pod name).
759
                        // The actual launcher pod has the label 'vm.kubevirt.io/name' with the VM name as value.
760
                        // Try to find launcher pods by this label.
761
                        selector := labels.SelectorFromSet(map[string]string{kubevirtv1.DeprecatedVirtualMachineNameLabel: podName})
×
762
                        launcherPods, err := c.podsLister.Pods(podNamespace).List(selector)
×
763
                        if err != nil {
×
764
                                klog.Errorf("failed to list launcher pods for vm %s/%s: %v", podNamespace, podName, err)
×
765
                                continue
×
766
                        }
767

768
                        // If we found launcher pod(s) for this VM, keep the interface
769
                        if len(launcherPods) > 0 {
×
770
                                klog.V(5).Infof("found %d launcher pod(s) for vm %s/%s, keeping ovs interface %s",
×
771
                                        len(launcherPods), podNamespace, podName, iface)
×
772
                                continue
×
773
                        }
774

775
                        // No pod on this node and no launcher pod found - safe to delete
776
                        klog.Infof("pod %s/%s not found on this node, delete ovs interface %s", podNamespace, podName, iface)
×
777
                        if err = ovs.CleanInterface(iface); err != nil {
×
778
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
779
                        }
×
780
                }
781
        }
782
}
783

784
func (c *Controller) runIPSecWorker() {
×
785
        for c.processNextIPSecWorkItem() {
×
786
        }
×
787
}
788

789
func (c *Controller) processNextIPSecWorkItem() bool {
×
790
        key, shutdown := c.ipsecQueue.Get()
×
791
        if shutdown {
×
792
                return false
×
793
        }
×
794
        defer c.ipsecQueue.Done(key)
×
795

×
796
        err := func(key string) error {
×
797
                if err := c.SyncIPSecKeys(key); err != nil {
×
798
                        c.ipsecQueue.AddRateLimited(key)
×
799
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
800
                }
×
801
                c.ipsecQueue.Forget(key)
×
802
                return nil
×
803
        }(key)
804
        if err != nil {
×
805
                utilruntime.HandleError(err)
×
806
                return true
×
807
        }
×
808
        return true
×
809
}
810

811
func (c *Controller) runUpdateNodeWorker() {
×
812
        for c.processNextUpdateNodeWorkItem() {
×
813
        }
×
814
}
815

816
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
817
        key, shutdown := c.updateNodeQueue.Get()
×
818
        if shutdown {
×
819
                return false
×
820
        }
×
821

822
        err := func(key string) error {
×
823
                defer c.updateNodeQueue.Done(key)
×
824
                if err := c.handleUpdateNode(key); err != nil {
×
825
                        c.updateNodeQueue.AddRateLimited(key)
×
826
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
827
                }
×
828
                c.updateNodeQueue.Forget(key)
×
829
                return nil
×
830
        }(key)
831
        if err != nil {
×
832
                utilruntime.HandleError(err)
×
833
                return true
×
834
        }
×
835
        return true
×
836
}
837

838
func (c *Controller) handleUpdateNode(key string) error {
×
839
        node, err := c.nodesLister.Get(key)
×
840
        if err != nil {
×
841
                if k8serrors.IsNotFound(err) {
×
842
                        return nil
×
843
                }
×
844
                klog.Error(err)
×
845
                return err
×
846
        }
847

848
        klog.Infof("updating node networks for node %s", key)
×
849
        return c.config.UpdateNodeNetworks(node)
×
850
}
851

852
// Run starts controller
853
func (c *Controller) Run(stopCh <-chan struct{}) {
×
854
        defer utilruntime.HandleCrash()
×
855
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
856
        defer c.deleteProviderNetworkQueue.ShutDown()
×
857
        defer c.subnetQueue.ShutDown()
×
858
        defer c.serviceQueue.ShutDown()
×
859
        defer c.updatePodQueue.ShutDown()
×
860
        defer c.ipsecQueue.ShutDown()
×
861
        defer c.updateNodeQueue.ShutDown()
×
862
        defer c.vswitchClient.Close()
×
863

×
864
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
865
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
866
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
867

×
868
        if err := c.setIPSet(); err != nil {
×
869
                util.LogFatalAndExit(err, "failed to set ipsets")
×
870
        }
×
871

872
        klog.Info("Started workers")
×
873
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
874
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
875
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
876
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
877
        go wait.Until(c.runAddOrUpdateServiceWorker, time.Second, stopCh)
×
878
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
879
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
880
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
881
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
882
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
883
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
884
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
885
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
886
        go wait.Until(func() {
×
887
                if err := c.reconcileRouters(nil); err != nil {
×
888
                        klog.Errorf("failed to reconcile %s routes: %v", util.NodeNic, err)
×
889
                }
×
890
        }, 3*time.Second, stopCh)
891

892
        if c.config.EnableTProxy {
×
893
                go c.StartTProxyForwarding()
×
894
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
895
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
896
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
897
                // kubelet to tproxy, if probe success recover the iptable rules
×
898
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
899
        } else {
×
900
                c.cleanTProxyConfig()
×
901
        }
×
902

903
        if !c.config.EnableOVNIPSec {
×
904
                if err := c.StopAndClearIPSecResource(); err != nil {
×
905
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
906
                }
×
907
        }
908

909
        // Start OpenFlow sync loop
910
        go c.runFlowSync(stopCh)
×
911

×
912
        // start fdb sync loop
×
913
        go c.runFdbSync(stopCh)
×
914

×
915
        <-stopCh
×
916
        klog.Info("Shutting down workers")
×
917
}
918

919
func recompute() {
×
920
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
921
        if err != nil {
×
922
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
923
        }
×
924
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc