• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 25472384490

07 May 2026 02:20AM UTC coverage: 24.899% (+0.06%) from 24.835%
25472384490

push

github

web-flow
feat(servicecidr): support K8s multiple ServiceCIDR (KEP-1880) (#6690)

* feat(servicecidr): support Kubernetes multiple ServiceCIDR (KEP-1880)

Watch networking.k8s.io/v1 ServiceCIDR objects and merge them with the
--service-cluster-ip-range flag value into a single source of truth used
by every Service-CIDR consumer (U2O policy routes, vpc-lb init
containers, vpc-nat-gw routes, daemon ipset/iptables). The flag now
serves as a startup fallback that yields once the API observes any valid
entry, and re-engages if the API set ever empties — so old clusters
without the API behave exactly as before, while 1.33+ clusters converge
to the API-advertised set and pick up dynamic add/remove.

ServiceCIDR API discovery uses APIResourceExists with a 10s ticker
fallback (same shape as the NAD/KubeVirt scaffolds), so missing API on
older clusters is a no-op rather than an error. RBAC for
networking.k8s.io/servicecidrs is added to system:ovn and
system:kube-ovn-cni in install.sh and both Helm charts.

Notable behavior changes:
- vpc-lb deployment is now upserted with a ServiceCIDR hash annotation;
  changing the merged set rolls the pod via the existing Recreate
  strategy.
- U2O no-LB policy routes are pruned at the start of every reconcile
  (policy-only delete, port groups untouched) so shrinking the set no
  longer leaves stale OVN entries.
- Existing VPC NAT gateways are intentionally not re-enqueued on
  ServiceCIDR change; their routes only refresh when the pod is
  recreated by other means. Newly created NAT gateways pick up the
  current store via their own add path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* refactor(servicecidr): consolidate duplicates and use stdlib helpers

- Promote readyServiceCIDRs to util.ReadyServiceCIDRs so controller and
  daemon share one source of truth.
- Drop custom equalStringSlice in favour of slices.Equal.
- Extract vpcLbInitContainers helpe... (continued)

124 of 467 new or added lines in 12 files covered. (26.55%)

2 existing lines in 1 file now uncovered.

14198 of 57023 relevant lines covered (24.9%)

0.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.22
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        "k8s.io/apimachinery/pkg/selection"
21
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
22
        "k8s.io/apimachinery/pkg/util/wait"
23
        "k8s.io/client-go/informers"
24
        "k8s.io/client-go/kubernetes/scheme"
25
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
26
        listerv1 "k8s.io/client-go/listers/core/v1"
27
        netv1lister "k8s.io/client-go/listers/networking/v1"
28
        "k8s.io/client-go/tools/cache"
29
        "k8s.io/client-go/tools/record"
30
        "k8s.io/client-go/util/workqueue"
31
        "k8s.io/klog/v2"
32
        k8sexec "k8s.io/utils/exec"
33
        kubevirtv1 "kubevirt.io/api/core/v1"
34

35
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
36
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
37
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
38
        "github.com/kubeovn/kube-ovn/pkg/ovs"
39
        "github.com/kubeovn/kube-ovn/pkg/util"
40
)
41

42
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
43
type Controller struct {
44
        config *Configuration
45

46
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
47
        providerNetworksSynced          cache.InformerSynced
48
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
49
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
50

51
        vlansLister kubeovnlister.VlanLister
52
        vlansSynced cache.InformerSynced
53

54
        subnetsLister kubeovnlister.SubnetLister
55
        subnetsSynced cache.InformerSynced
56
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
57

58
        ovnEipsLister kubeovnlister.OvnEipLister
59
        ovnEipsSynced cache.InformerSynced
60

61
        podsLister     listerv1.PodLister
62
        podsSynced     cache.InformerSynced
63
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
64

65
        nodesLister     listerv1.NodeLister
66
        nodesSynced     cache.InformerSynced
67
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
68

69
        servicesLister listerv1.ServiceLister
70
        servicesSynced cache.InformerSynced
71
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
72

73
        caSecretLister listerv1.SecretLister
74
        caSecretSynced cache.InformerSynced
75
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
76

77
        serviceCIDRStore           *util.ServiceCIDRStore
78
        serviceCIDRLister          netv1lister.ServiceCIDRLister
79
        serviceCIDRSynced          cache.InformerSynced
80
        serviceCIDRInformerFactory informers.SharedInformerFactory
81

82
        recorder record.EventRecorder
83

84
        protocol string
85

86
        ControllerRuntime
87

88
        k8sExec k8sexec.Interface
89

90
        ipsecServiceStarted sync.Once
91

92
        // channel used for fdb sync
93
        fdbSyncChan   chan struct{}
94
        fdbSyncMutex  sync.Mutex
95
        vswitchClient ovs.Vswitch
96
}
97

98
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
99
        if rateLimiter == nil {
×
100
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
101
        }
×
102
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
103
}
104

105
// NewController init a daemon controller
106
func NewController(config *Configuration,
107
        stopCh <-chan struct{},
108
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
109
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
110
) (*Controller, error) {
×
111
        eventBroadcaster := record.NewBroadcaster()
×
112
        eventBroadcaster.StartLogging(klog.Infof)
×
113
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
114
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
115
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
116
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
117
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
118
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
119
        podInformer := podInformerFactory.Core().V1().Pods()
×
120
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
121
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
122
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
123

×
124
        controller := &Controller{
×
125
                config: config,
×
126

×
127
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
128
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
129
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
130
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
131

×
132
                vlansLister: vlanInformer.Lister(),
×
133
                vlansSynced: vlanInformer.Informer().HasSynced,
×
134

×
135
                subnetsLister: subnetInformer.Lister(),
×
136
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
137
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
138

×
139
                ovnEipsLister: ovnEipInformer.Lister(),
×
140
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
141

×
142
                podsLister:     podInformer.Lister(),
×
143
                podsSynced:     podInformer.Informer().HasSynced,
×
144
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
145

×
146
                nodesLister:     nodeInformer.Lister(),
×
147
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
148
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
149

×
150
                servicesLister: servicesInformer.Lister(),
×
151
                servicesSynced: servicesInformer.Informer().HasSynced,
×
152
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
153

×
154
                caSecretLister: caSecretInformer.Lister(),
×
155
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
156
                ipsecQueue:     newTypedRateLimitingQueue[string]("IPSecCA", nil),
×
157

×
NEW
158
                serviceCIDRStore: util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
NEW
159
                serviceCIDRInformerFactory: informers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
NEW
160
                        informers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
NEW
161
                                listOption.AllowWatchBookmarks = true
×
NEW
162
                        }),
×
163
                ),
164

165
                recorder: recorder,
166
                k8sExec:  k8sexec.New(),
167

168
                fdbSyncChan: make(chan struct{}, 1),
169
        }
170

171
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
172
        if err != nil {
×
173
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
174
        }
×
175
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
176

×
177
        if err = controller.initRuntime(); err != nil {
×
178
                return nil, err
×
179
        }
×
180

181
        podInformerFactory.Start(stopCh)
×
182
        nodeInformerFactory.Start(stopCh)
×
183
        kubeovnInformerFactory.Start(stopCh)
×
184
        caSecretInformerFactory.Start(stopCh)
×
NEW
185
        controller.StartServiceCIDRInformerFactory(stopCh)
×
186

×
187
        if !cache.WaitForCacheSync(stopCh,
×
188
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
189
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
190
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
191
        }
×
192

193
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
194
                AddFunc:    controller.enqueueAddProviderNetwork,
×
195
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
196
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
197
        }); err != nil {
×
198
                return nil, err
×
199
        }
×
200
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
201
                UpdateFunc: controller.enqueueUpdateVlan,
×
202
        }); err != nil {
×
203
                return nil, err
×
204
        }
×
205
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
206
                AddFunc:    controller.enqueueAddSubnet,
×
207
                UpdateFunc: controller.enqueueUpdateSubnet,
×
208
                DeleteFunc: controller.enqueueDeleteSubnet,
×
209
        }); err != nil {
×
210
                return nil, err
×
211
        }
×
212
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
213
                AddFunc:    controller.enqueueAddService,
×
214
                DeleteFunc: controller.enqueueDeleteService,
×
215
                UpdateFunc: controller.enqueueUpdateService,
×
216
        }); err != nil {
×
217
                util.LogFatalAndExit(err, "failed to add service event handler")
×
218
        }
×
219

220
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
221
                UpdateFunc: controller.enqueueUpdatePod,
×
222
        }); err != nil {
×
223
                return nil, err
×
224
        }
×
225
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
226
                AddFunc:    controller.enqueueAddIPSecCA,
×
227
                UpdateFunc: controller.enqueueUpdateIPSecCA,
×
228
        }); err != nil {
×
229
                return nil, err
×
230
        }
×
231
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
232
                UpdateFunc: controller.enqueueUpdateNode,
×
233
        }); err != nil {
×
234
                return nil, err
×
235
        }
×
236

237
        if controller.vswitchClient, err = ovs.NewVswitchClient("unix:/var/run/openvswitch/db.sock", 1, 3); err != nil {
×
238
                return nil, fmt.Errorf("failed to create vswitch client: %w", err)
×
239
        }
×
240

241
        return controller, nil
×
242
}
243

244
func (c *Controller) enqueueAddIPSecCA(obj any) {
×
245
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
246
        klog.V(3).Infof("enqueue add CA %s", key)
×
247
        c.ipsecQueue.Add(key)
×
248
}
×
249

250
func (c *Controller) enqueueUpdateIPSecCA(oldObj, newObj any) {
×
251
        oldSecret := oldObj.(*v1.Secret)
×
252
        newSecret := newObj.(*v1.Secret)
×
253
        if maps.EqualFunc(oldSecret.Data, newSecret.Data, bytes.Equal) {
×
254
                // No changes in CA data, no need to enqueue
×
255
                return
×
256
        }
×
257

258
        key := cache.MetaObjectToName(newSecret).String()
×
259
        klog.V(3).Infof("enqueue update CA %s", key)
×
260
        c.ipsecQueue.Add(key)
×
261
}
262

263
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
264
        oldNode := oldObj.(*v1.Node)
×
265
        newNode := newObj.(*v1.Node)
×
266
        if newNode.Name != c.config.NodeName {
×
267
                return
×
268
        }
×
269
        if oldNode.Annotations[util.NodeNetworksAnnotation] != newNode.Annotations[util.NodeNetworksAnnotation] {
×
270
                klog.V(3).Infof("enqueue update node %s for node networks change", newNode.Name)
×
271
                c.updateNodeQueue.Add(newNode.Name)
×
272
        }
×
273
}
274

275
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
276
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
277
        klog.V(3).Infof("enqueue add provider network %s", key)
×
278
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
279
}
×
280

281
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
282
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
283
        klog.V(3).Infof("enqueue update provider network %s", key)
×
284
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
285
}
×
286

287
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
288
        var pn *kubeovnv1.ProviderNetwork
×
289
        switch t := obj.(type) {
×
290
        case *kubeovnv1.ProviderNetwork:
×
291
                pn = t
×
292
        case cache.DeletedFinalStateUnknown:
×
293
                p, ok := t.Obj.(*kubeovnv1.ProviderNetwork)
×
294
                if !ok {
×
295
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
296
                        return
×
297
                }
×
298
                pn = p
×
299
        default:
×
300
                klog.Warningf("unexpected type: %T", obj)
×
301
                return
×
302
        }
303

304
        key := cache.MetaObjectToName(pn).String()
×
305
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
306
        c.deleteProviderNetworkQueue.Add(pn)
×
307
}
308

309
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
310
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
311
        }
×
312
}
313

314
func (c *Controller) runDeleteProviderNetworkWorker() {
×
315
        for c.processNextDeleteProviderNetworkWorkItem() {
×
316
        }
×
317
}
318

319
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
320
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
321
        if shutdown {
×
322
                return false
×
323
        }
×
324

325
        err := func(key string) error {
×
326
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
327
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
328
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
329
                }
×
330
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
331
                return nil
×
332
        }(key)
333
        if err != nil {
×
334
                utilruntime.HandleError(err)
×
335
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
336
                return true
×
337
        }
×
338
        return true
×
339
}
340

341
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
342
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
343
        if shutdown {
×
344
                return false
×
345
        }
×
346

347
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
348
                defer c.deleteProviderNetworkQueue.Done(obj)
×
349
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
350
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
351
                }
×
352
                c.deleteProviderNetworkQueue.Forget(obj)
×
353
                return nil
×
354
        }(obj)
355
        if err != nil {
×
356
                utilruntime.HandleError(err)
×
357
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
358
                return true
×
359
        }
×
360
        return true
×
361
}
362

363
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
364
        klog.V(3).Infof("handle update provider network %s", key)
×
365
        node, err := c.nodesLister.Get(c.config.NodeName)
×
366
        if err != nil {
×
367
                klog.Error(err)
×
368
                return err
×
369
        }
×
370
        pn, err := c.providerNetworksLister.Get(key)
×
371
        if err != nil {
×
372
                if k8serrors.IsNotFound(err) {
×
373
                        return nil
×
374
                }
×
375
                klog.Error(err)
×
376
                return err
×
377
        }
378

379
        // Skip initialization if the provider network is being deleted.
380
        // Without this check, a requeue from a previous error could trigger re-init
381
        // during deletion, adding the NIC as a port to a dying bridge. This creates
382
        // stale OVS netdev cache entries that block exchange-link-name bridge creation.
383
        if !pn.DeletionTimestamp.IsZero() {
×
384
                klog.V(3).Infof("provider network %s is being deleted, skip init", key)
×
385
                return nil
×
386
        }
×
387

388
        excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
389
        if err != nil {
×
390
                klog.Error(err)
×
391
                return err
×
392
        }
×
393

394
        if excluded {
×
395
                c.recordProviderNetworkErr(pn.Name, "")
×
396
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
397
        }
×
398
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
399
}
400

401
func providerNetworkNic(pn *kubeovnv1.ProviderNetwork, nodeName string) string {
1✔
402
        for _, item := range pn.Spec.CustomInterfaces {
1✔
403
                if slices.Contains(item.Nodes, nodeName) {
×
404
                        return item.Interface
×
405
                }
×
406
        }
407
        return pn.Spec.DefaultInterface
1✔
408
}
409

410
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
411
        nic := providerNetworkNic(pn, node.Name)
×
412

×
413
        patch := util.KVPatch{
×
414
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
415
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
416
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
417
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
418
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
419
        }
×
420

×
421
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
422
        for _, vlanName := range pn.Status.Vlans {
×
423
                vlan, err := c.vlansLister.Get(vlanName)
×
424
                if err != nil {
×
425
                        if k8serrors.IsNotFound(err) {
×
426
                                klog.Infof("vlan %s not found", vlanName)
×
427
                                continue
×
428
                        }
429
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
430
                        return err
×
431
                }
432
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
433
        }
434
        // always add trunk 0 so that the ovs bridge can communicate with the external network
435
        vlans.Add("0")
×
436

×
437
        // Auto-create VLAN subinterface if enabled and nic contains VLAN ID
×
438
        if pn.Spec.AutoCreateVlanSubinterfaces && strings.Contains(nic, ".") {
×
439
                parts := strings.SplitN(nic, ".", 2)
×
440
                parentIf := parts[0]
×
441
                if !util.CheckInterfaceExists(nic) {
×
442
                        klog.Infof("Auto-create enabled: creating default VLAN subinterface %s on %s", nic, parentIf)
×
443
                        if err := c.createVlanSubinterfaces([]string{nic}, parentIf, pn.Name); err != nil {
×
444
                                klog.Errorf("Failed to create default VLAN subinterface %s: %v", nic, err)
×
445
                                return err
×
446
                        }
×
447
                } else {
×
448
                        klog.V(3).Infof("Default VLAN subinterface %s already exists, skipping creation", nic)
×
449
                }
×
450
        }
451

452
        // VLAN sub-interface handling - use map for efficiency
453
        vlanInterfaceMap := make(map[string]int) // interfaceName -> vlanID
×
454

×
455
        // Process explicitly specified VLAN interfaces
×
456
        if len(pn.Spec.VlanInterfaces) > 0 {
×
457
                klog.Infof("Processing %d explicitly specified VLAN interfaces", len(pn.Spec.VlanInterfaces))
×
458
                for _, vlanIfName := range pn.Spec.VlanInterfaces {
×
459
                        if util.CheckInterfaceExists(vlanIfName) {
×
460
                                // Extract VLAN ID from interface name (e.g., "eth0.10" -> 10)
×
461
                                vlanID, err := util.ExtractVlanIDFromInterface(vlanIfName)
×
462
                                if err != nil {
×
463
                                        klog.Warningf("Failed to extract VLAN ID from interface %s: %v", vlanIfName, err)
×
464
                                        continue
×
465
                                }
466
                                vlanInterfaceMap[vlanIfName] = vlanID
×
467
                                vlans.Add(strconv.Itoa(vlanID))
×
468
                                klog.V(3).Infof("Added explicit VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
469
                        } else {
×
470
                                klog.Warningf("Explicitly specified VLAN interface %s does not exist, skipping", vlanIfName)
×
471
                        }
×
472
                }
473
        }
474

475
        // Auto-detection of additional VLAN interfaces (if enabled)
476
        if pn.Spec.PreserveVlanInterfaces {
×
477
                klog.Infof("Auto-detecting VLAN interfaces on %s", nic)
×
478
                vlanIDs := util.DetectVlanInterfaces(nic)
×
479
                for _, vlanID := range vlanIDs {
×
480
                        vlanIfName := fmt.Sprintf("%s.%d", nic, vlanID)
×
481
                        // Only add if not already explicitly specified
×
482
                        if _, exists := vlanInterfaceMap[vlanIfName]; !exists {
×
483
                                vlanInterfaceMap[vlanIfName] = vlanID
×
484
                                vlans.Add(strconv.Itoa(vlanID))
×
485
                                klog.V(3).Infof("Auto-detected VLAN interface %s (VLAN ID %d)", vlanIfName, vlanID)
×
486
                        } else {
×
487
                                klog.V(3).Infof("VLAN interface %s already explicitly specified, skipping auto-detection", vlanIfName)
×
488
                        }
×
489
                }
490
                klog.Infof("Auto-detected %d additional VLAN interfaces for %s", len(vlanIDs), nic)
×
491
        }
492

493
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, nic, vlanInterfaceMap); err != nil {
×
494
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
495
                return err
×
496
        }
×
497

498
        var mtu int
×
499
        var err error
×
500
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
501
        // Configure main interface with ALL VLANs (including detected ones) in trunk
×
502
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback, vlanInterfaceMap); err != nil {
×
503
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
504
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
505
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
506
                }
×
507
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
508
                return err
×
509
        }
510

511
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
512
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
513
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
514
        if len(vlanInterfaceMap) > 0 {
×
515
                patch[fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name)] = "true"
×
516
        }
×
517
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
518
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
519
                return err
×
520
        }
×
521
        c.recordProviderNetworkErr(pn.Name, "")
×
522
        return nil
×
523
}
524

525
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
526
        pod, err := c.podsLister.Pods(c.config.PodNamespace).Get(c.config.PodName)
×
527
        if err != nil {
×
528
                klog.Errorf("failed to get pod %s/%s, %v", c.config.PodNamespace, c.config.PodName, err)
×
529
                return
×
530
        }
×
531

532
        patch := util.KVPatch{}
×
533
        if pod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
534
                if errMsg == "" {
×
535
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
536
                } else {
×
537
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
538
                }
×
539
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
540
                        klog.Errorf("failed to patch pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
541
                        return
×
542
                }
×
543
        }
544
}
545

546
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
1✔
547
        patch := util.KVPatch{
1✔
548
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
1✔
549
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
1✔
550
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
1✔
551
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
1✔
552
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
1✔
553
        }
1✔
554
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
1✔
555
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
556
                return err
×
557
        }
×
558

559
        if err := c.ovsCleanProviderNetwork(pn.Name, providerNetworkNic(pn, node.Name)); err != nil {
2✔
560
                return err
1✔
561
        }
1✔
562

563
        return c.cleanupAutoCreatedVlanInterfaces(pn.Name, "", nil)
×
564
}
565

566
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
567
        if err := c.ovsCleanProviderNetwork(pn.Name, providerNetworkNic(pn, c.config.NodeName)); err != nil {
×
568
                klog.Error(err)
×
569
                return err
×
570
        }
×
571

572
        if err := c.cleanupAutoCreatedVlanInterfaces(pn.Name, "", nil); err != nil {
×
573
                klog.Errorf("Failed to cleanup auto-created VLAN interfaces for provider %s: %v", pn.Name, err)
×
574
                return err
×
575
        }
×
576

577
        node, err := c.nodesLister.Get(c.config.NodeName)
×
578
        if err != nil {
×
579
                klog.Error(err)
×
580
                return err
×
581
        }
×
582
        if len(node.Labels) == 0 {
×
583
                return nil
×
584
        }
×
585

586
        patch := util.KVPatch{
×
587
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
588
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
589
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
590
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
591
                fmt.Sprintf(util.ProviderNetworkVlanIntTemplate, pn.Name):   nil,
×
592
        }
×
593
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
594
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
595
                return err
×
596
        }
×
597

598
        return nil
×
599
}
600

601
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
602
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
603
        newVlan := newObj.(*kubeovnv1.Vlan)
×
604
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
605
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
606
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
607
        }
×
608
}
609

610
type subnetEvent struct {
611
        oldObj, newObj any
612
}
613

614
type serviceEvent struct {
615
        oldObj, newObj any
616
}
617

618
func (c *Controller) enqueueAddSubnet(obj any) {
×
619
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
620
}
×
621

622
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
623
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
624
}
×
625

626
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
627
        switch t := obj.(type) {
×
628
        case *kubeovnv1.Subnet:
×
629
                c.subnetQueue.Add(&subnetEvent{oldObj: t})
×
630
        case cache.DeletedFinalStateUnknown:
×
631
                subnet, ok := t.Obj.(*kubeovnv1.Subnet)
×
632
                if !ok {
×
633
                        klog.Warningf("unexpected object type in tombstone: %T", t.Obj)
×
634
                        return
×
635
                }
×
636
                c.subnetQueue.Add(&subnetEvent{oldObj: subnet})
×
637
        default:
×
638
                klog.Warningf("unexpected type: %T", obj)
×
639
        }
640
}
641

642
func (c *Controller) runSubnetWorker() {
×
643
        for c.processNextSubnetWorkItem() {
×
644
        }
×
645
}
646

647
func (c *Controller) enqueueAddService(obj any) {
×
648
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
649
}
×
650

651
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
652
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
653
}
×
654

655
func (c *Controller) enqueueDeleteService(obj any) {
×
656
        switch t := obj.(type) {
×
657
        case *v1.Service:
×
658
                c.serviceQueue.Add(&serviceEvent{oldObj: t})
×
659
        case cache.DeletedFinalStateUnknown:
×
660
                svc, ok := t.Obj.(*v1.Service)
×
661
                if !ok {
×
662
                        klog.Warningf("unexpected object type in tombstone: %T", t.Obj)
×
663
                        return
×
664
                }
×
665
                c.serviceQueue.Add(&serviceEvent{oldObj: svc})
×
666
        default:
×
667
                klog.Warningf("unexpected type: %T", obj)
×
668
        }
669
}
670

671
func (c *Controller) runAddOrUpdateServiceWorker() {
×
672
        for c.processNextServiceWorkItem() {
×
673
        }
×
674
}
675

676
func (c *Controller) processNextSubnetWorkItem() bool {
×
677
        obj, shutdown := c.subnetQueue.Get()
×
678
        if shutdown {
×
679
                return false
×
680
        }
×
681

682
        err := func(obj *subnetEvent) error {
×
683
                defer c.subnetQueue.Done(obj)
×
684
                c.requestFdbSync()
×
685
                if err := c.reconcileRouters(obj); err != nil {
×
686
                        c.subnetQueue.AddRateLimited(obj)
×
687
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
688
                }
×
689
                c.subnetQueue.Forget(obj)
×
690
                return nil
×
691
        }(obj)
692
        if err != nil {
×
693
                utilruntime.HandleError(err)
×
694
                return true
×
695
        }
×
696
        return true
×
697
}
698

699
func (c *Controller) processNextServiceWorkItem() bool {
×
700
        obj, shutdown := c.serviceQueue.Get()
×
701
        if shutdown {
×
702
                return false
×
703
        }
×
704

705
        err := func(obj *serviceEvent) error {
×
706
                defer c.serviceQueue.Done(obj)
×
707
                if err := c.reconcileServices(obj); err != nil {
×
708
                        c.serviceQueue.AddRateLimited(obj)
×
709
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
710
                }
×
711
                c.serviceQueue.Forget(obj)
×
712
                return nil
×
713
        }(obj)
714
        if err != nil {
×
715
                utilruntime.HandleError(err)
×
716
                return true
×
717
        }
×
718
        return true
×
719
}
720

721
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
722
        oldPod := oldObj.(*v1.Pod)
×
723
        newPod := newObj.(*v1.Pod)
×
724
        key := cache.MetaObjectToName(newPod).String()
×
725

×
726
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
727
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
728
                oldPod.Annotations[util.IngressBurstAnnotation] != newPod.Annotations[util.IngressBurstAnnotation] ||
×
729
                oldPod.Annotations[util.EgressBurstAnnotation] != newPod.Annotations[util.EgressBurstAnnotation] ||
×
730
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
731
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
732
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
733
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
734
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
735
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
736
                c.updatePodQueue.Add(key)
×
737
                return
×
738
        }
×
739

740
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
741
        if err != nil {
×
742
                return
×
743
        }
×
744
        for _, multiNet := range attachNets {
×
745
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
746
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
747
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
748
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
749
                                oldPod.Annotations[fmt.Sprintf(util.IngressBurstAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressBurstAnnotationTemplate, provider)] ||
×
750
                                oldPod.Annotations[fmt.Sprintf(util.EgressBurstAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressBurstAnnotationTemplate, provider)] ||
×
751
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
752
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
753
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
754
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
755
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
756
                                c.updatePodQueue.Add(key)
×
757
                        }
×
758
                }
759
        }
760
}
761

762
func (c *Controller) runUpdatePodWorker() {
×
763
        for c.processNextUpdatePodWorkItem() {
×
764
        }
×
765
}
766

767
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
768
        key, shutdown := c.updatePodQueue.Get()
×
769
        if shutdown {
×
770
                return false
×
771
        }
×
772

773
        err := func(key string) error {
×
774
                defer c.updatePodQueue.Done(key)
×
775
                if err := c.handleUpdatePod(key); err != nil {
×
776
                        c.updatePodQueue.AddRateLimited(key)
×
777
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
778
                }
×
779
                c.updatePodQueue.Forget(key)
×
780
                return nil
×
781
        }(key)
782
        if err != nil {
×
783
                utilruntime.HandleError(err)
×
784
                return true
×
785
        }
×
786
        return true
×
787
}
788

789
// isVMLauncherPodAlive checks whether any KubeVirt launcher pod exists for the given VMI name.
790
// It tries multiple lookup strategies to handle different KubeVirt versions:
791
//  1. VirtualMachineInstanceIDLabel (vmi.kubevirt.io/id) — unique, available in KubeVirt >= 1.7
792
//  2. DeprecatedVirtualMachineNameLabel (vm.kubevirt.io/name) — may not be unique when VM hostname is set
793
//  3. DomainAnnotation (kubevirt.io/domain) — always the real VMI name, handles names > 63 chars
794
func (c *Controller) isVMLauncherPodAlive(namespace, vmiName, iface string) bool {
1✔
795
        // Try the new unique label first (KubeVirt >= 1.7)
1✔
796
        selector := labels.SelectorFromSet(map[string]string{kubevirtv1.VirtualMachineInstanceIDLabel: vmiName})
1✔
797
        launcherPods, err := c.podsLister.Pods(namespace).List(selector)
1✔
798
        if err != nil {
1✔
799
                klog.Errorf("failed to list launcher pods by %s for vmi %s/%s: %v",
×
800
                        kubevirtv1.VirtualMachineInstanceIDLabel, namespace, vmiName, err)
×
801
                return false
×
802
        }
×
803
        if len(launcherPods) > 0 {
2✔
804
                klog.V(5).Infof("found %d launcher pod(s) by %s for vmi %s/%s, keeping ovs interface %s",
1✔
805
                        len(launcherPods), kubevirtv1.VirtualMachineInstanceIDLabel, namespace, vmiName, iface)
1✔
806
                return true
1✔
807
        }
1✔
808

809
        // Fall back to the deprecated label for older KubeVirt versions
810
        selector = labels.SelectorFromSet(map[string]string{kubevirtv1.DeprecatedVirtualMachineNameLabel: vmiName})
1✔
811
        launcherPods, err = c.podsLister.Pods(namespace).List(selector)
1✔
812
        if err != nil {
1✔
813
                klog.Errorf("failed to list launcher pods by %s for vmi %s/%s: %v",
×
814
                        kubevirtv1.DeprecatedVirtualMachineNameLabel, namespace, vmiName, err)
×
815
                return false
×
816
        }
×
817
        if len(launcherPods) > 0 {
2✔
818
                klog.V(5).Infof("found %d launcher pod(s) by %s for vmi %s/%s, keeping ovs interface %s",
1✔
819
                        len(launcherPods), kubevirtv1.DeprecatedVirtualMachineNameLabel, namespace, vmiName, iface)
1✔
820
                return true
1✔
821
        }
1✔
822

823
        // Final fallback: for VMI names > 63 chars where VirtualMachineInstanceIDLabel is hashed,
824
        // match by kubevirt.io/domain annotation which always contains the real VMI name.
825
        // Use the deprecated label as a selector to narrow down to virt-launcher pods only,
826
        // then match the annotation for the exact VMI name.
827
        selector = labels.Everything()
1✔
828
        if req, err := labels.NewRequirement(kubevirtv1.DeprecatedVirtualMachineNameLabel, selection.Exists, nil); err == nil {
2✔
829
                selector = selector.Add(*req)
1✔
830
        }
1✔
831
        candidates, err := c.podsLister.Pods(namespace).List(selector)
1✔
832
        if err != nil {
1✔
833
                klog.Errorf("failed to list virt-launcher pods in namespace %s for vmi annotation lookup: %v", namespace, err)
×
834
                return false
×
835
        }
×
836
        for _, p := range candidates {
2✔
837
                if p.Annotations[kubevirtv1.DomainAnnotation] == vmiName {
2✔
838
                        klog.V(5).Infof("found launcher pod %s by %s annotation for vmi %s/%s, keeping ovs interface %s",
1✔
839
                                p.Name, kubevirtv1.DomainAnnotation, namespace, vmiName, iface)
1✔
840
                        return true
1✔
841
                }
1✔
842
        }
843

844
        return false
1✔
845
}
846

847
func (c *Controller) gcInterfaces() {
×
848
        interfacePodMap, err := ovs.ListInterfacePodMap()
×
849
        if err != nil {
×
850
                klog.Errorf("failed to list interface pod map: %v", err)
×
851
                return
×
852
        }
×
853
        for iface, pod := range interfacePodMap {
×
854
                parts := strings.Split(pod, "/")
×
855
                if len(parts) < 3 {
×
856
                        klog.Errorf("malformed pod string %q for interface %s, expected format 'namespace/name/errText'", pod, iface)
×
857
                        continue
×
858
                }
859

860
                podNamespace, podName, errText := parts[0], parts[1], parts[2]
×
861
                if strings.Contains(errText, "No such device") {
×
862
                        klog.Infof("pod %s/%s not found, delete ovs interface %s", podNamespace, podName, iface)
×
863
                        if err := ovs.CleanInterface(iface); err != nil {
×
864
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
865
                        }
×
866
                        continue
×
867
                }
868

869
                if _, err = c.podsLister.Pods(podNamespace).Get(podName); err != nil {
×
870
                        if !k8serrors.IsNotFound(err) {
×
871
                                klog.Errorf("failed to get pod %s/%s: %v", podNamespace, podName, err)
×
872
                                continue
×
873
                        }
874

875
                        // Pod not found by name. Check if this might be a KubeVirt VM.
876
                        // For KubeVirt VMs, the pod_name in OVS external_ids is set to the VMI name (not the launcher pod name).
877
                        // Try to find launcher pods using KubeVirt labels/annotations.
878
                        if c.isVMLauncherPodAlive(podNamespace, podName, iface) {
×
879
                                continue
×
880
                        }
881

882
                        // No pod on this node and no launcher pod found - safe to delete
883
                        klog.Infof("pod %s/%s not found on this node, delete ovs interface %s", podNamespace, podName, iface)
×
884
                        if err = ovs.CleanInterface(iface); err != nil {
×
885
                                klog.Errorf("failed to clean ovs interface %s: %v", iface, err)
×
886
                        }
×
887
                }
888
        }
889
}
890

891
func (c *Controller) runIPSecWorker() {
×
892
        for c.processNextIPSecWorkItem() {
×
893
        }
×
894
}
895

896
func (c *Controller) processNextIPSecWorkItem() bool {
×
897
        key, shutdown := c.ipsecQueue.Get()
×
898
        if shutdown {
×
899
                return false
×
900
        }
×
901
        defer c.ipsecQueue.Done(key)
×
902

×
903
        err := func(key string) error {
×
904
                if err := c.SyncIPSecKeys(key); err != nil {
×
905
                        c.ipsecQueue.AddRateLimited(key)
×
906
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
907
                }
×
908
                c.ipsecQueue.Forget(key)
×
909
                return nil
×
910
        }(key)
911
        if err != nil {
×
912
                utilruntime.HandleError(err)
×
913
                return true
×
914
        }
×
915
        return true
×
916
}
917

918
func (c *Controller) runUpdateNodeWorker() {
×
919
        for c.processNextUpdateNodeWorkItem() {
×
920
        }
×
921
}
922

923
func (c *Controller) processNextUpdateNodeWorkItem() bool {
×
924
        key, shutdown := c.updateNodeQueue.Get()
×
925
        if shutdown {
×
926
                return false
×
927
        }
×
928

929
        err := func(key string) error {
×
930
                defer c.updateNodeQueue.Done(key)
×
931
                if err := c.handleUpdateNode(key); err != nil {
×
932
                        c.updateNodeQueue.AddRateLimited(key)
×
933
                        return fmt.Errorf("error syncing node %q: %w, requeuing", key, err)
×
934
                }
×
935
                c.updateNodeQueue.Forget(key)
×
936
                return nil
×
937
        }(key)
938
        if err != nil {
×
939
                utilruntime.HandleError(err)
×
940
                return true
×
941
        }
×
942
        return true
×
943
}
944

945
func (c *Controller) handleUpdateNode(key string) error {
×
946
        node, err := c.nodesLister.Get(key)
×
947
        if err != nil {
×
948
                if k8serrors.IsNotFound(err) {
×
949
                        return nil
×
950
                }
×
951
                klog.Error(err)
×
952
                return err
×
953
        }
954

955
        klog.Infof("updating node networks for node %s", key)
×
956
        return c.config.UpdateNodeNetworks(node)
×
957
}
958

959
// Run starts controller
960
func (c *Controller) Run(stopCh <-chan struct{}) {
×
961
        defer utilruntime.HandleCrash()
×
962
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
963
        defer c.deleteProviderNetworkQueue.ShutDown()
×
964
        defer c.subnetQueue.ShutDown()
×
965
        defer c.serviceQueue.ShutDown()
×
966
        defer c.updatePodQueue.ShutDown()
×
967
        defer c.ipsecQueue.ShutDown()
×
968
        defer c.updateNodeQueue.ShutDown()
×
969
        defer c.vswitchClient.Close()
×
970

×
971
        go wait.Until(c.gcInterfaces, time.Minute, stopCh)
×
972
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
973
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
974

×
975
        if err := c.setIPSet(); err != nil {
×
976
                util.LogFatalAndExit(err, "failed to set ipsets")
×
977
        }
×
978

979
        klog.Info("Started workers")
×
980
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
981
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
982
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
983
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
984
        go wait.Until(c.runAddOrUpdateServiceWorker, time.Second, stopCh)
×
985
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
986
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
987
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
988
        go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
×
989
        go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
990
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
991
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
992
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
993
        go wait.Until(func() {
×
994
                if err := c.reconcileRouters(nil); err != nil {
×
995
                        klog.Errorf("failed to reconcile %s routes: %v", util.NodeNic, err)
×
996
                }
×
997
        }, 3*time.Second, stopCh)
998

999
        if c.config.EnableTProxy {
×
1000
                go c.StartTProxyForwarding()
×
1001
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
1002
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
1003
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
1004
                // kubelet to tproxy, if probe success recover the iptable rules
×
1005
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
1006
        } else {
×
1007
                c.cleanTProxyConfig()
×
1008
        }
×
1009

1010
        if !c.config.EnableOVNIPSec {
×
1011
                if err := c.StopAndClearIPSecResource(); err != nil {
×
1012
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
1013
                }
×
1014
        }
1015

1016
        // Start OpenFlow sync loop
1017
        go c.runFlowSync(stopCh)
×
1018

×
1019
        // start fdb sync loop
×
1020
        go c.runFdbSync(stopCh)
×
1021

×
1022
        <-stopCh
×
1023
        klog.Info("Shutting down workers")
×
1024
}
1025

1026
func recompute() {
×
1027
        output, err := ovs.Appctl(ovs.OvnController, "inc-engine/recompute")
×
1028
        if err != nil {
×
1029
                klog.Errorf("failed to trigger force recompute for %s: %q", ovs.OvnController, output)
×
1030
        }
×
1031
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc