• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 16158575117

09 Jul 2025 02:04AM UTC coverage: 21.396% (-0.1%) from 21.536%
16158575117

push

github

web-flow
Use cert-manager to issue certificates for IPSec (#5365)

* Add support for issuing IPSec tunnel certificates using cert-manager.

When cert-manager certificates are enabled, the controller no longer generates the IPSec CA cert or private key stored in the `ovn-ipsec-ca` secret. The secret should be populated with the same CA as configured with cert-manager. It still enables IPSec in OVN NB.

When cert-manager certificates are enabled the CNI daemon creates cert-manager CertificateRequest resources instead of CSRs. A cert-manager ClusterIssuer should be configured to approve and sign these CertificateRequests with a matching CA as configured in `ovn-ipsec-ca` secret. The name of the issuer to use is configurable in the CNI.

The CNI daemon now watches the `ovn-ipsec-ca` secret for changes allowing for rollout of a new trust bundle. It verifies the currently configured certificate is signed by the new bundle and if not then triggers a new certificate to be issued. The daemon now splits each certificate in the CA bundle into a separate file as strongswan is unable to parse multiple CAs from a single file.

The CNI daemon now requests a new certificate when the current certificate is at least half way to expiry based on the times in the certificate. When generating a new certificate the daemon also generates a new key just in case the previous one was leaked somehow. The certificate lifetime is also now configurable rather than lasting for a year. The CNI no longer restarts the ipsec or ovs-ipsec-monitor services when the certificate changes and just requests ipsec to reread the CA certs if they change.

To allow for the CNI daemon to keep track of the versions of its key, certificate, and CA cert files it now stores them with locally unique names on disk. Keys and certs are suffixed with the timestamp they were generated. CA files are suffixed with the k8s revision number of the `ovn-ipsec-ca` secret.

The cert manager validation webhook (if used) shoul... (continued)

0 of 449 new or added lines in 5 files covered. (0.0%)

7 existing lines in 2 files now uncovered.

10515 of 49145 relevant lines covered (21.4%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller.go
1
package daemon
2

3
import (
4
        "context"
5
        "fmt"
6
        "os/exec"
7
        "slices"
8
        "strconv"
9
        "time"
10

11
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
12
        "github.com/scylladb/go-set/strset"
13
        v1 "k8s.io/api/core/v1"
14
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17
        "k8s.io/apimachinery/pkg/util/wait"
18
        "k8s.io/client-go/informers"
19
        "k8s.io/client-go/kubernetes/scheme"
20
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
21
        listerv1 "k8s.io/client-go/listers/core/v1"
22
        "k8s.io/client-go/tools/cache"
23
        "k8s.io/client-go/tools/record"
24
        "k8s.io/client-go/util/workqueue"
25
        "k8s.io/klog/v2"
26
        k8sexec "k8s.io/utils/exec"
27

28
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
29
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
30
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
31
        "github.com/kubeovn/kube-ovn/pkg/ovs"
32
        "github.com/kubeovn/kube-ovn/pkg/util"
33
)
34

35
// Controller watch pod and namespace changes to update iptables, ipset and ovs qos
36
type Controller struct {
37
        config *Configuration
38

39
        providerNetworksLister          kubeovnlister.ProviderNetworkLister
40
        providerNetworksSynced          cache.InformerSynced
41
        addOrUpdateProviderNetworkQueue workqueue.TypedRateLimitingInterface[string]
42
        deleteProviderNetworkQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.ProviderNetwork]
43

44
        vlansLister kubeovnlister.VlanLister
45
        vlansSynced cache.InformerSynced
46

47
        subnetsLister kubeovnlister.SubnetLister
48
        subnetsSynced cache.InformerSynced
49
        subnetQueue   workqueue.TypedRateLimitingInterface[*subnetEvent]
50

51
        ovnEipsLister kubeovnlister.OvnEipLister
52
        ovnEipsSynced cache.InformerSynced
53

54
        podsLister     listerv1.PodLister
55
        podsSynced     cache.InformerSynced
56
        updatePodQueue workqueue.TypedRateLimitingInterface[string]
57
        deletePodQueue workqueue.TypedRateLimitingInterface[string]
58

59
        nodesLister listerv1.NodeLister
60
        nodesSynced cache.InformerSynced
61

62
        servicesLister listerv1.ServiceLister
63
        servicesSynced cache.InformerSynced
64
        serviceQueue   workqueue.TypedRateLimitingInterface[*serviceEvent]
65

66
        caSecretLister listerv1.SecretLister
67
        caSecretSynced cache.InformerSynced
68
        ipsecQueue     workqueue.TypedRateLimitingInterface[string]
69

70
        recorder record.EventRecorder
71

72
        protocol string
73

74
        ControllerRuntime
75
        localPodName   string
76
        localNamespace string
77

78
        k8sExec k8sexec.Interface
79
}
80

81
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
×
82
        if rateLimiter == nil {
×
83
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
×
84
        }
×
85
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
×
86
}
87

88
// NewController init a daemon controller
89
func NewController(config *Configuration,
90
        stopCh <-chan struct{},
91
        podInformerFactory, nodeInformerFactory, caSecretInformerFactory informers.SharedInformerFactory,
92
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory,
NEW
93
) (*Controller, error) {
×
94
        eventBroadcaster := record.NewBroadcaster()
×
95
        eventBroadcaster.StartLogging(klog.Infof)
×
96
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)})
×
97
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})
×
98
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
99
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
100
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
101
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
102
        podInformer := podInformerFactory.Core().V1().Pods()
×
103
        nodeInformer := nodeInformerFactory.Core().V1().Nodes()
×
104
        servicesInformer := nodeInformerFactory.Core().V1().Services()
×
NEW
105
        caSecretInformer := caSecretInformerFactory.Core().V1().Secrets()
×
106

×
107
        controller := &Controller{
×
108
                config: config,
×
109

×
110
                providerNetworksLister:          providerNetworkInformer.Lister(),
×
111
                providerNetworksSynced:          providerNetworkInformer.Informer().HasSynced,
×
112
                addOrUpdateProviderNetworkQueue: newTypedRateLimitingQueue[string]("AddOrUpdateProviderNetwork", nil),
×
113
                deleteProviderNetworkQueue:      newTypedRateLimitingQueue[*kubeovnv1.ProviderNetwork]("DeleteProviderNetwork", nil),
×
114

×
115
                vlansLister: vlanInformer.Lister(),
×
116
                vlansSynced: vlanInformer.Informer().HasSynced,
×
117

×
118
                subnetsLister: subnetInformer.Lister(),
×
119
                subnetsSynced: subnetInformer.Informer().HasSynced,
×
120
                subnetQueue:   newTypedRateLimitingQueue[*subnetEvent]("Subnet", nil),
×
121

×
122
                ovnEipsLister: ovnEipInformer.Lister(),
×
123
                ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
×
124

×
125
                podsLister:     podInformer.Lister(),
×
126
                podsSynced:     podInformer.Informer().HasSynced,
×
127
                updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
×
128
                deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil),
×
129

×
130
                nodesLister: nodeInformer.Lister(),
×
131
                nodesSynced: nodeInformer.Informer().HasSynced,
×
132

×
133
                servicesLister: servicesInformer.Lister(),
×
134
                servicesSynced: servicesInformer.Informer().HasSynced,
×
135
                serviceQueue:   newTypedRateLimitingQueue[*serviceEvent]("Service", nil),
×
136

×
NEW
137
                caSecretLister: caSecretInformer.Lister(),
×
NEW
138
                caSecretSynced: caSecretInformer.Informer().HasSynced,
×
NEW
139
                ipsecQueue:     newTypedRateLimitingQueue[string]("CA", nil),
×
NEW
140

×
141
                recorder: recorder,
×
142
                k8sExec:  k8sexec.New(),
×
143
        }
×
144

×
145
        node, err := config.KubeClient.CoreV1().Nodes().Get(context.Background(), config.NodeName, metav1.GetOptions{})
×
146
        if err != nil {
×
147
                util.LogFatalAndExit(err, "failed to get node %s info", config.NodeName)
×
148
        }
×
149
        controller.protocol = util.CheckProtocol(node.Annotations[util.IPAddressAnnotation])
×
150

×
151
        if err = controller.initRuntime(); err != nil {
×
152
                return nil, err
×
153
        }
×
154

155
        podInformerFactory.Start(stopCh)
×
156
        nodeInformerFactory.Start(stopCh)
×
157
        kubeovnInformerFactory.Start(stopCh)
×
NEW
158
        caSecretInformerFactory.Start(stopCh)
×
159

×
160
        if !cache.WaitForCacheSync(stopCh,
×
161
                controller.providerNetworksSynced, controller.vlansSynced, controller.subnetsSynced,
×
NEW
162
                controller.podsSynced, controller.nodesSynced, controller.servicesSynced, controller.caSecretSynced) {
×
163
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
164
        }
×
165

166
        if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
167
                AddFunc:    controller.enqueueAddProviderNetwork,
×
168
                UpdateFunc: controller.enqueueUpdateProviderNetwork,
×
169
                DeleteFunc: controller.enqueueDeleteProviderNetwork,
×
170
        }); err != nil {
×
171
                return nil, err
×
172
        }
×
173
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
174
                UpdateFunc: controller.enqueueUpdateVlan,
×
175
        }); err != nil {
×
176
                return nil, err
×
177
        }
×
178
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
179
                AddFunc:    controller.enqueueAddSubnet,
×
180
                UpdateFunc: controller.enqueueUpdateSubnet,
×
181
                DeleteFunc: controller.enqueueDeleteSubnet,
×
182
        }); err != nil {
×
183
                return nil, err
×
184
        }
×
185
        if _, err = servicesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
186
                AddFunc:    controller.enqueueAddService,
×
187
                DeleteFunc: controller.enqueueDeleteService,
×
188
                UpdateFunc: controller.enqueueUpdateService,
×
189
        }); err != nil {
×
190
                util.LogFatalAndExit(err, "failed to add service event handler")
×
191
        }
×
192

193
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
194
                UpdateFunc: controller.enqueueUpdatePod,
×
195
                DeleteFunc: controller.enqueueDeletePod,
×
196
        }); err != nil {
×
197
                return nil, err
×
198
        }
×
NEW
199
        if _, err = caSecretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
NEW
200
                AddFunc:    controller.enqueueAddCA,
×
NEW
201
                UpdateFunc: controller.enqueueUpdateCA,
×
NEW
202
                DeleteFunc: controller.enqueueDeleteCA,
×
NEW
203
        }); err != nil {
×
NEW
204
                return nil, err
×
NEW
205
        }
×
206

207
        return controller, nil
×
208
}
209

NEW
210
func (c *Controller) enqueueAddCA(obj any) {
×
NEW
211
        key := cache.MetaObjectToName(obj.(*v1.Secret)).String()
×
NEW
212
        klog.V(3).Infof("enqueue add CA %s", key)
×
NEW
213
        c.ipsecQueue.Add(key)
×
NEW
214
}
×
215

NEW
216
func (c *Controller) enqueueUpdateCA(_, newObj any) {
×
NEW
217
        key := cache.MetaObjectToName(newObj.(*v1.Secret)).String()
×
NEW
218
        klog.V(3).Infof("enqueue update CA %s", key)
×
NEW
219
        c.ipsecQueue.Add(key)
×
NEW
220
}
×
221

NEW
222
func (c *Controller) enqueueDeleteCA(obj any) {
×
NEW
223
        pn := obj.(*v1.Secret)
×
NEW
224
        key := cache.MetaObjectToName(pn).String()
×
NEW
225
        klog.V(3).Infof("enqueue delete CA %s", key)
×
NEW
226
        c.ipsecQueue.Add(key)
×
NEW
227
}
×
228

229
func (c *Controller) enqueueAddProviderNetwork(obj any) {
×
230
        key := cache.MetaObjectToName(obj.(*kubeovnv1.ProviderNetwork)).String()
×
231
        klog.V(3).Infof("enqueue add provider network %s", key)
×
232
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
233
}
×
234

235
func (c *Controller) enqueueUpdateProviderNetwork(_, newObj any) {
×
236
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.ProviderNetwork)).String()
×
237
        klog.V(3).Infof("enqueue update provider network %s", key)
×
238
        c.addOrUpdateProviderNetworkQueue.Add(key)
×
239
}
×
240

241
func (c *Controller) enqueueDeleteProviderNetwork(obj any) {
×
242
        pn := obj.(*kubeovnv1.ProviderNetwork)
×
243
        key := cache.MetaObjectToName(pn).String()
×
244
        klog.V(3).Infof("enqueue delete provider network %s", key)
×
245
        c.deleteProviderNetworkQueue.Add(pn)
×
246
}
×
247

248
func (c *Controller) runAddOrUpdateProviderNetworkWorker() {
×
249
        for c.processNextAddOrUpdateProviderNetworkWorkItem() {
×
250
        }
×
251
}
252

253
func (c *Controller) runDeleteProviderNetworkWorker() {
×
254
        for c.processNextDeleteProviderNetworkWorkItem() {
×
255
        }
×
256
}
257

258
func (c *Controller) processNextAddOrUpdateProviderNetworkWorkItem() bool {
×
259
        key, shutdown := c.addOrUpdateProviderNetworkQueue.Get()
×
260
        if shutdown {
×
261
                return false
×
262
        }
×
263

264
        err := func(key string) error {
×
265
                defer c.addOrUpdateProviderNetworkQueue.Done(key)
×
266
                if err := c.handleAddOrUpdateProviderNetwork(key); err != nil {
×
267
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
268
                }
×
269
                c.addOrUpdateProviderNetworkQueue.Forget(key)
×
270
                return nil
×
271
        }(key)
272
        if err != nil {
×
273
                utilruntime.HandleError(err)
×
274
                c.addOrUpdateProviderNetworkQueue.AddRateLimited(key)
×
275
                return true
×
276
        }
×
277
        return true
×
278
}
279

280
func (c *Controller) processNextDeleteProviderNetworkWorkItem() bool {
×
281
        obj, shutdown := c.deleteProviderNetworkQueue.Get()
×
282
        if shutdown {
×
283
                return false
×
284
        }
×
285

286
        err := func(obj *kubeovnv1.ProviderNetwork) error {
×
287
                defer c.deleteProviderNetworkQueue.Done(obj)
×
288
                if err := c.handleDeleteProviderNetwork(obj); err != nil {
×
289
                        return fmt.Errorf("error syncing %q: %w, requeuing", obj.Name, err)
×
290
                }
×
291
                c.deleteProviderNetworkQueue.Forget(obj)
×
292
                return nil
×
293
        }(obj)
294
        if err != nil {
×
295
                utilruntime.HandleError(err)
×
296
                c.deleteProviderNetworkQueue.AddRateLimited(obj)
×
297
                return true
×
298
        }
×
299
        return true
×
300
}
301

302
func (c *Controller) handleAddOrUpdateProviderNetwork(key string) error {
×
303
        klog.V(3).Infof("handle update provider network %s", key)
×
304
        node, err := c.nodesLister.Get(c.config.NodeName)
×
305
        if err != nil {
×
306
                klog.Error(err)
×
307
                return err
×
308
        }
×
309
        pn, err := c.providerNetworksLister.Get(key)
×
310
        if err != nil {
×
311
                if k8serrors.IsNotFound(err) {
×
312
                        return nil
×
313
                }
×
314
                klog.Error(err)
×
315
                return err
×
316
        }
317

318
        if slices.Contains(pn.Spec.ExcludeNodes, node.Name) {
×
319
                c.recordProviderNetworkErr(pn.Name, "")
×
320
                return c.cleanProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
321
        }
×
322
        return c.initProviderNetwork(pn.DeepCopy(), node.DeepCopy())
×
323
}
324

325
func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
326
        nic := pn.Spec.DefaultInterface
×
327
        for _, item := range pn.Spec.CustomInterfaces {
×
328
                if slices.Contains(item.Nodes, node.Name) {
×
329
                        nic = item.Interface
×
330
                        break
×
331
                }
332
        }
333

334
        patch := util.KVPatch{
×
335
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
336
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
337
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
338
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
339
        }
×
340

×
341
        vlans := strset.NewWithSize(len(pn.Status.Vlans) + 1)
×
342
        for _, vlanName := range pn.Status.Vlans {
×
343
                vlan, err := c.vlansLister.Get(vlanName)
×
344
                if err != nil {
×
345
                        if k8serrors.IsNotFound(err) {
×
346
                                klog.Infof("vlan %s not found", vlanName)
×
347
                                continue
×
348
                        }
349
                        klog.Errorf("failed to get vlan %q: %v", vlanName, err)
×
350
                        return err
×
351
                }
352
                vlans.Add(strconv.Itoa(vlan.Spec.ID))
×
353
        }
354
        // always add trunk 0 so that the ovs bridge can communicate with the external network
355
        vlans.Add("0")
×
356

×
357
        var mtu int
×
358
        var err error
×
359
        klog.V(3).Infof("ovs init provider network %s", pn.Name)
×
360
        if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil {
×
361
                delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name))
×
362
                if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil {
×
363
                        klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1)
×
364
                }
×
365
                c.recordProviderNetworkErr(pn.Name, err.Error())
×
366
                return err
×
367
        }
368

369
        patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true"
×
370
        patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic
×
371
        patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu)
×
372
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
373
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
374
                return err
×
375
        }
×
376
        c.recordProviderNetworkErr(pn.Name, "")
×
377
        return nil
×
378
}
379

380
func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) {
×
381
        var currentPod *v1.Pod
×
382
        var err error
×
383
        if c.localPodName == "" {
×
384
                pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
×
385
                        LabelSelector: "app=kube-ovn-cni",
×
386
                        FieldSelector: "spec.nodeName=" + c.config.NodeName,
×
387
                })
×
388
                if err != nil {
×
389
                        klog.Errorf("failed to list pod: %v", err)
×
390
                        return
×
391
                }
×
392
                for _, pod := range pods.Items {
×
393
                        if pod.Spec.NodeName == c.config.NodeName && pod.Status.Phase == v1.PodRunning {
×
394
                                c.localPodName = pod.Name
×
395
                                c.localNamespace = pod.Namespace
×
396
                                currentPod = &pod
×
397
                                break
×
398
                        }
399
                }
400
                if currentPod == nil {
×
401
                        klog.Warning("failed to get self pod")
×
402
                        return
×
403
                }
×
404
        } else {
×
405
                if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil {
×
406
                        klog.Errorf("failed to get pod %s, %v", c.localPodName, err)
×
407
                        return
×
408
                }
×
409
        }
410

411
        patch := util.KVPatch{}
×
412
        if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg {
×
413
                if errMsg == "" {
×
414
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil
×
415
                } else {
×
416
                        patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg
×
417
                }
×
418
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil {
×
419
                        klog.Errorf("failed to patch pod %s: %v", c.localPodName, err)
×
420
                        return
×
421
                }
×
422
        }
423
}
424

425
func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error {
×
426
        patch := util.KVPatch{
×
427
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
428
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
429
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
430
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   "true",
×
431
        }
×
432
        if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
433
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
434
                return err
×
435
        }
×
436

437
        return c.ovsCleanProviderNetwork(pn.Name)
×
438
}
439

440
func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) error {
×
441
        if err := c.ovsCleanProviderNetwork(pn.Name); err != nil {
×
442
                klog.Error(err)
×
443
                return err
×
444
        }
×
445

446
        node, err := c.nodesLister.Get(c.config.NodeName)
×
447
        if err != nil {
×
448
                klog.Error(err)
×
449
                return err
×
450
        }
×
451
        if len(node.Labels) == 0 {
×
452
                return nil
×
453
        }
×
454

455
        patch := util.KVPatch{
×
456
                fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name):     nil,
×
457
                fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil,
×
458
                fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name):       nil,
×
459
                fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name):   nil,
×
460
        }
×
461
        if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
462
                klog.Errorf("failed to patch labels of node %s: %v", node.Name, err)
×
463
                return err
×
464
        }
×
465

466
        return nil
×
467
}
468

469
func (c *Controller) enqueueUpdateVlan(oldObj, newObj any) {
×
470
        oldVlan := oldObj.(*kubeovnv1.Vlan)
×
471
        newVlan := newObj.(*kubeovnv1.Vlan)
×
472
        if oldVlan.Spec.ID != newVlan.Spec.ID {
×
473
                klog.V(3).Infof("enqueue update provider network %q", newVlan.Spec.Provider)
×
474
                c.addOrUpdateProviderNetworkQueue.Add(newVlan.Spec.Provider)
×
475
        }
×
476
}
477

478
type subnetEvent struct {
479
        oldObj, newObj any
480
}
481

482
type serviceEvent struct {
483
        oldObj, newObj any
484
}
485

486
func (c *Controller) enqueueAddSubnet(obj any) {
×
487
        c.subnetQueue.Add(&subnetEvent{newObj: obj})
×
488
}
×
489

490
func (c *Controller) enqueueUpdateSubnet(oldObj, newObj any) {
×
491
        c.subnetQueue.Add(&subnetEvent{oldObj: oldObj, newObj: newObj})
×
492
}
×
493

494
func (c *Controller) enqueueDeleteSubnet(obj any) {
×
495
        c.subnetQueue.Add(&subnetEvent{oldObj: obj})
×
496
}
×
497

498
func (c *Controller) runSubnetWorker() {
×
499
        for c.processNextSubnetWorkItem() {
×
500
        }
×
501
}
502

503
func (c *Controller) enqueueAddService(obj any) {
×
504
        c.serviceQueue.Add(&serviceEvent{newObj: obj})
×
505
}
×
506

507
func (c *Controller) enqueueUpdateService(oldObj, newObj any) {
×
508
        c.serviceQueue.Add(&serviceEvent{oldObj: oldObj, newObj: newObj})
×
509
}
×
510

511
func (c *Controller) enqueueDeleteService(obj any) {
×
512
        c.serviceQueue.Add(&serviceEvent{oldObj: obj})
×
513
}
×
514

515
func (c *Controller) runAddOrUpdateServicekWorker() {
×
516
        for c.processNextServiceWorkItem() {
×
517
        }
×
518
}
519

520
func (c *Controller) processNextSubnetWorkItem() bool {
×
521
        obj, shutdown := c.subnetQueue.Get()
×
522
        if shutdown {
×
523
                return false
×
524
        }
×
525

526
        err := func(obj *subnetEvent) error {
×
527
                defer c.subnetQueue.Done(obj)
×
528
                if err := c.reconcileRouters(obj); err != nil {
×
529
                        c.subnetQueue.AddRateLimited(obj)
×
530
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
531
                }
×
532
                c.subnetQueue.Forget(obj)
×
533
                return nil
×
534
        }(obj)
535
        if err != nil {
×
536
                utilruntime.HandleError(err)
×
537
                return true
×
538
        }
×
539
        return true
×
540
}
541

542
func (c *Controller) processNextServiceWorkItem() bool {
×
543
        obj, shutdown := c.serviceQueue.Get()
×
544
        if shutdown {
×
545
                return false
×
546
        }
×
547

548
        err := func(obj *serviceEvent) error {
×
549
                defer c.serviceQueue.Done(obj)
×
550
                if err := c.reconcileServices(obj); err != nil {
×
551
                        c.serviceQueue.AddRateLimited(obj)
×
552
                        return fmt.Errorf("error syncing %v: %w, requeuing", obj, err)
×
553
                }
×
554
                c.serviceQueue.Forget(obj)
×
555
                return nil
×
556
        }(obj)
557
        if err != nil {
×
558
                utilruntime.HandleError(err)
×
559
                return true
×
560
        }
×
561
        return true
×
562
}
563

564
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
×
565
        oldPod := oldObj.(*v1.Pod)
×
566
        newPod := newObj.(*v1.Pod)
×
567
        key := cache.MetaObjectToName(newPod).String()
×
568

×
569
        if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
×
570
                oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] ||
×
571
                oldPod.Annotations[util.NetemQosLatencyAnnotation] != newPod.Annotations[util.NetemQosLatencyAnnotation] ||
×
572
                oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
×
573
                oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
×
574
                oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
×
575
                oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
×
576
                oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
×
577
                c.updatePodQueue.Add(key)
×
578
                return
×
579
        }
×
580

581
        attachNets, err := nadutils.ParsePodNetworkAnnotation(newPod)
×
582
        if err != nil {
×
583
                return
×
584
        }
×
585
        for _, multiNet := range attachNets {
×
586
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
587
                if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
588
                        if oldPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)] ||
×
589
                                oldPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)] ||
×
590
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)] ||
×
591
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)] ||
×
592
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
×
593
                                oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
×
594
                                oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
×
595
                                c.updatePodQueue.Add(key)
×
596
                        }
×
597
                }
598
        }
599
}
600

601
func (c *Controller) enqueueDeletePod(obj any) {
×
602
        pod := obj.(*v1.Pod)
×
603
        key := cache.MetaObjectToName(pod).String()
×
604
        c.deletePodQueue.Add(key)
×
605
}
×
606

607
func (c *Controller) runUpdatePodWorker() {
×
608
        for c.processNextUpdatePodWorkItem() {
×
609
        }
×
610
}
611

612
func (c *Controller) runDeletePodWorker() {
×
613
        for c.processNextDeletePodWorkItem() {
×
614
        }
×
615
}
616

617
func (c *Controller) processNextUpdatePodWorkItem() bool {
×
618
        key, shutdown := c.updatePodQueue.Get()
×
619
        if shutdown {
×
620
                return false
×
621
        }
×
622

623
        err := func(key string) error {
×
624
                defer c.updatePodQueue.Done(key)
×
625
                if err := c.handleUpdatePod(key); err != nil {
×
626
                        c.updatePodQueue.AddRateLimited(key)
×
627
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
628
                }
×
629
                c.updatePodQueue.Forget(key)
×
630
                return nil
×
631
        }(key)
632
        if err != nil {
×
633
                utilruntime.HandleError(err)
×
634
                return true
×
635
        }
×
636
        return true
×
637
}
638

639
func (c *Controller) processNextDeletePodWorkItem() bool {
×
640
        key, shutdown := c.deletePodQueue.Get()
×
641
        if shutdown {
×
642
                return false
×
643
        }
×
644

645
        err := func(key string) error {
×
646
                defer c.deletePodQueue.Done(key)
×
647
                if err := c.handleDeletePod(key); err != nil {
×
648
                        c.deletePodQueue.AddRateLimited(key)
×
649
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
650
                }
×
651
                c.deletePodQueue.Forget(key)
×
652
                return nil
×
653
        }(key)
654
        if err != nil {
×
655
                utilruntime.HandleError(err)
×
656
                return true
×
657
        }
×
658
        return true
×
659
}
660

661
var lastNoPodOvsPort map[string]bool
662

663
func (c *Controller) markAndCleanInternalPort() error {
×
664
        klog.V(4).Infof("start to gc ovs internal ports")
×
665
        residualPorts := ovs.GetResidualInternalPorts()
×
666
        if len(residualPorts) == 0 {
×
667
                return nil
×
668
        }
×
669

670
        noPodOvsPort := map[string]bool{}
×
671
        for _, portName := range residualPorts {
×
672
                if !lastNoPodOvsPort[portName] {
×
673
                        noPodOvsPort[portName] = true
×
674
                } else {
×
675
                        klog.Infof("gc ovs internal port %s", portName)
×
676
                        // Remove ovs port
×
677
                        output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName)
×
678
                        if err != nil {
×
679
                                return fmt.Errorf("failed to delete ovs port %w, %q", err, output)
×
680
                        }
×
681
                }
682
        }
683
        lastNoPodOvsPort = noPodOvsPort
×
684

×
685
        return nil
×
686
}
687

NEW
688
func (c *Controller) runIPSecWorker() {
×
NEW
689
        if err := c.StartIPSecService(); err != nil {
×
NEW
690
                klog.Errorf("starting ipsec service: %v", err)
×
NEW
691
                return
×
NEW
692
        }
×
693

NEW
694
        c.ipsecQueue.AddRateLimited("")
×
NEW
695

×
NEW
696
        for c.processNextIPSecWorkItem() {
×
NEW
697
        }
×
698
}
699

NEW
700
func (c *Controller) processNextIPSecWorkItem() bool {
×
NEW
701
        key, shutdown := c.ipsecQueue.Get()
×
NEW
702
        if shutdown {
×
NEW
703
                return false
×
NEW
704
        }
×
NEW
705
        defer c.ipsecQueue.Done(key)
×
NEW
706

×
NEW
707
        err := func(key string) error {
×
NEW
708
                if err := c.SyncIPSecKeys(); err != nil {
×
NEW
709
                        c.ipsecQueue.AddRateLimited(key)
×
NEW
710
                        return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
×
NEW
711
                }
×
NEW
712
                c.ipsecQueue.Forget(key)
×
NEW
713
                return nil
×
714
        }(key)
NEW
715
        if err != nil {
×
NEW
716
                utilruntime.HandleError(err)
×
NEW
717
                return true
×
NEW
718
        }
×
NEW
719
        return true
×
720
}
721

722
// Run starts controller
723
func (c *Controller) Run(stopCh <-chan struct{}) {
×
724
        defer utilruntime.HandleCrash()
×
725
        defer c.addOrUpdateProviderNetworkQueue.ShutDown()
×
726
        defer c.deleteProviderNetworkQueue.ShutDown()
×
727
        defer c.subnetQueue.ShutDown()
×
728
        defer c.serviceQueue.ShutDown()
×
729
        defer c.updatePodQueue.ShutDown()
×
730
        defer c.deletePodQueue.ShutDown()
×
NEW
731
        defer c.ipsecQueue.ShutDown()
×
732
        go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh)
×
733
        go wait.Until(recompute, 10*time.Minute, stopCh)
×
734
        go wait.Until(rotateLog, 1*time.Hour, stopCh)
×
735

×
736
        if err := c.setIPSet(); err != nil {
×
737
                util.LogFatalAndExit(err, "failed to set ipsets")
×
738
        }
×
739

740
        klog.Info("Started workers")
×
741
        go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh)
×
742
        go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh)
×
743
        go wait.Until(c.loopTunnelCheck, 5*time.Second, stopCh)
×
744
        go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh)
×
745
        go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
×
746
        go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
×
747
        go wait.Until(c.runSubnetWorker, time.Second, stopCh)
×
748
        go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
×
749
        go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
×
750
        go wait.Until(c.runGateway, 3*time.Second, stopCh)
×
751
        go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
×
752
        go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)
×
753
        go wait.Until(func() {
×
754
                if err := c.reconcileRouters(nil); err != nil {
×
755
                        klog.Errorf("failed to reconcile ovn0 routes: %v", err)
×
756
                }
×
757
        }, 3*time.Second, stopCh)
758
        go wait.Until(func() {
×
759
                if err := c.markAndCleanInternalPort(); err != nil {
×
760
                        klog.Errorf("gc ovs port error: %v", err)
×
761
                }
×
762
        }, 5*time.Minute, stopCh)
763

764
        if c.config.EnableTProxy {
×
765
                go c.StartTProxyForwarding()
×
766
                go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
×
767
                // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
×
768
                // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
×
769
                // kubelet to tproxy, if probe success recover the iptable rules
×
770
                go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
×
771
        } else {
×
772
                c.cleanTProxyConfig()
×
773
        }
×
774

775
        if c.config.EnableOVNIPSec {
×
NEW
776
                go wait.Until(c.runIPSecWorker, 3*time.Second, stopCh)
×
777
        } else {
×
778
                if err := c.StopAndClearIPSecResouce(); err != nil {
×
779
                        klog.Errorf("stop and clear ipsec resource error: %v", err)
×
780
                }
×
781
        }
782

783
        <-stopCh
×
784
        klog.Info("Shutting down workers")
×
785
}
786

787
func recompute() {
×
788
        output, err := exec.Command("ovn-appctl", "-t", "ovn-controller", "inc-engine/recompute").CombinedOutput()
×
789
        if err != nil {
×
790
                klog.Errorf("failed to recompute ovn-controller %q", output)
×
791
        }
×
792
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc