• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 15865270751

25 Jun 2025 01:37AM UTC coverage: 21.524% (-0.03%) from 21.55%
15865270751

push

github

web-flow
release vm ip when vm is stopped then deleted (#5390)

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

0 of 69 new or added lines in 3 files covered. (0.0%)

3 existing lines in 2 files now uncovered.

10481 of 48694 relevant lines covered (21.52%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.24
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        "github.com/puzpuzpuz/xsync/v4"
11
        "golang.org/x/time/rate"
12
        corev1 "k8s.io/api/core/v1"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        kubeinformers "k8s.io/client-go/informers"
18
        "k8s.io/client-go/kubernetes/scheme"
19
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
20
        appsv1 "k8s.io/client-go/listers/apps/v1"
21
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
22
        v1 "k8s.io/client-go/listers/core/v1"
23
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
24
        netv1 "k8s.io/client-go/listers/networking/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        "k8s.io/klog/v2"
29
        "k8s.io/utils/keymutex"
30
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
31
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
32
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
33

34
        "github.com/kubeovn/kube-ovn/pkg/informer"
35

36
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
37
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
38
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
39
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
40
        "github.com/kubeovn/kube-ovn/pkg/ovs"
41
        "github.com/kubeovn/kube-ovn/pkg/util"
42
)
43

44
const controllerAgentName = "kube-ovn-controller"
45

46
const (
47
        logicalSwitchKey              = "ls"
48
        logicalRouterKey              = "lr"
49
        portGroupKey                  = "pg"
50
        networkPolicyKey              = "np"
51
        sgKey                         = "sg"
52
        associatedSgKeyPrefix         = "associated_sg_"
53
        sgsKey                        = "security_groups"
54
        u2oKey                        = "u2o"
55
        adminNetworkPolicyKey         = "anp"
56
        baselineAdminNetworkPolicyKey = "banp"
57
)
58

59
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
60
type Controller struct {
61
        config *Configuration
62

63
        ipam           *ovnipam.IPAM
64
        namedPort      *NamedPort
65
        anpPrioNameMap map[int32]string
66
        anpNamePrioMap map[string]int32
67

68
        OVNNbClient ovs.NbClient
69
        OVNSbClient ovs.SbClient
70

71
        // ExternalGatewayType define external gateway type, centralized
72
        ExternalGatewayType string
73

74
        podsLister             v1.PodLister
75
        podsSynced             cache.InformerSynced
76
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
77
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
78
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
79
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
80
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
81
        podKeyMutex            keymutex.KeyMutex
82

83
        vpcsLister           kubeovnlister.VpcLister
84
        vpcSynced            cache.InformerSynced
85
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
86
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
87
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
88
        vpcKeyMutex          keymutex.KeyMutex
89

90
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
91
        vpcNatGatewaySynced           cache.InformerSynced
92
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
93
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
94
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
95
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
96
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
97
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
98
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
99
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
100
        vpcNatGwKeyMutex              keymutex.KeyMutex
101

102
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
103
        vpcEgressGatewaySynced           cache.InformerSynced
104
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
105
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
106
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
107

108
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
109
        switchLBRuleSynced      cache.InformerSynced
110
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
111
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
112
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
113

114
        vpcDNSLister           kubeovnlister.VpcDnsLister
115
        vpcDNSSynced           cache.InformerSynced
116
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
118

119
        subnetsLister           kubeovnlister.SubnetLister
120
        subnetSynced            cache.InformerSynced
121
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
122
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
123
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
124
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
125
        subnetKeyMutex          keymutex.KeyMutex
126

127
        ippoolLister            kubeovnlister.IPPoolLister
128
        ippoolSynced            cache.InformerSynced
129
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
130
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
131
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
132
        ippoolKeyMutex          keymutex.KeyMutex
133

134
        ipsLister     kubeovnlister.IPLister
135
        ipSynced      cache.InformerSynced
136
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
137
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
138
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
139

140
        virtualIpsLister          kubeovnlister.VipLister
141
        virtualIpsSynced          cache.InformerSynced
142
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
143
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
144
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
145
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
146

147
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
148
        iptablesEipSynced      cache.InformerSynced
149
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
150
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
151
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
152
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
153

154
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
155
        iptablesFipSynced      cache.InformerSynced
156
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
157
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
158
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
159

160
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
161
        iptablesDnatRuleSynced      cache.InformerSynced
162
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
163
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
164
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
165

166
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
167
        iptablesSnatRuleSynced      cache.InformerSynced
168
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
169
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
170
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
171

172
        ovnEipsLister     kubeovnlister.OvnEipLister
173
        ovnEipSynced      cache.InformerSynced
174
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
175
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
176
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
177
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
178

179
        ovnFipsLister     kubeovnlister.OvnFipLister
180
        ovnFipSynced      cache.InformerSynced
181
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
182
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
183
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
184

185
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
186
        ovnSnatRuleSynced      cache.InformerSynced
187
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
188
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
189
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
190

191
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
192
        ovnDnatRuleSynced      cache.InformerSynced
193
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
195
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
196

197
        providerNetworksLister kubeovnlister.ProviderNetworkLister
198
        providerNetworkSynced  cache.InformerSynced
199

200
        vlansLister     kubeovnlister.VlanLister
201
        vlanSynced      cache.InformerSynced
202
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
203
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
204
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
205
        vlanKeyMutex    keymutex.KeyMutex
206

207
        namespacesLister  v1.NamespaceLister
208
        namespacesSynced  cache.InformerSynced
209
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
210
        nsKeyMutex        keymutex.KeyMutex
211

212
        nodesLister     v1.NodeLister
213
        nodesSynced     cache.InformerSynced
214
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
215
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
216
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
217
        nodeKeyMutex    keymutex.KeyMutex
218

219
        servicesLister     v1.ServiceLister
220
        serviceSynced      cache.InformerSynced
221
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
222
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
223
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
224
        svcKeyMutex        keymutex.KeyMutex
225

226
        endpointSlicesLister          discoveryv1.EndpointSliceLister
227
        endpointSlicesSynced          cache.InformerSynced
228
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
229
        epKeyMutex                    keymutex.KeyMutex
230

231
        deploymentsLister appsv1.DeploymentLister
232
        deploymentsSynced cache.InformerSynced
233

234
        npsLister     netv1.NetworkPolicyLister
235
        npsSynced     cache.InformerSynced
236
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
237
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
238
        npKeyMutex    keymutex.KeyMutex
239

240
        sgsLister          kubeovnlister.SecurityGroupLister
241
        sgSynced           cache.InformerSynced
242
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
243
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
244
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
245
        sgKeyMutex         keymutex.KeyMutex
246

247
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
248
        qosPolicySynced      cache.InformerSynced
249
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
250
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
251
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
252

253
        configMapsLister v1.ConfigMapLister
254
        configMapsSynced cache.InformerSynced
255

256
        anpsLister     anplister.AdminNetworkPolicyLister
257
        anpsSynced     cache.InformerSynced
258
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
259
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
260
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
261
        anpKeyMutex    keymutex.KeyMutex
262

263
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
264
        banpsSynced     cache.InformerSynced
265
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
266
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
267
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
268
        banpKeyMutex    keymutex.KeyMutex
269

270
        csrLister           certListerv1.CertificateSigningRequestLister
271
        csrSynced           cache.InformerSynced
272
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
273

274
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
275
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
276
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
277

278
        recorder               record.EventRecorder
279
        informerFactory        kubeinformers.SharedInformerFactory
280
        cmInformerFactory      kubeinformers.SharedInformerFactory
281
        deployInformerFactory  kubeinformers.SharedInformerFactory
282
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
283
        anpInformerFactory     anpinformer.SharedInformerFactory
284

285
        // Database health check
286
        dbFailureCount int
287
}
288

289
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
290
        if rateLimiter == nil {
2✔
291
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
292
        }
1✔
293
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
294
}
295

296
// Run creates and runs a new ovn controller
297
func Run(ctx context.Context, config *Configuration) {
×
298
        klog.V(4).Info("Creating event broadcaster")
×
299
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
300
        eventBroadcaster.StartLogging(klog.Infof)
×
301
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
302
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
303
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
304
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
305
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
306
        )
×
307

×
308
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
309
        if err != nil {
×
310
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
311
        }
×
312

313
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
314
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
315
                        listOption.AllowWatchBookmarks = true
×
316
                }))
×
317
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
318
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
319
                        listOption.AllowWatchBookmarks = true
×
320
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
321
        // deployment informer used to list/watch vpc egress gateway workloads
322
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
323
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
324
                        listOption.AllowWatchBookmarks = true
×
325
                        listOption.LabelSelector = selector.String()
×
326
                }))
×
327
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
328
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
329
                        listOption.AllowWatchBookmarks = true
×
330
                }))
×
331
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
332
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
333
                        listOption.AllowWatchBookmarks = true
×
334
                }))
×
335

336
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
337

×
338
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
339
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
340
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
341
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
342
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
343
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
344
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
345
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
346
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
347
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
348
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
349
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
350
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
351
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
352
        podInformer := informerFactory.Core().V1().Pods()
×
353
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
354
        nodeInformer := informerFactory.Core().V1().Nodes()
×
355
        serviceInformer := informerFactory.Core().V1().Services()
×
356
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
357
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
358
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
359
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
360
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
361
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
362
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
363
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
364
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
365
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
366
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
367
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
368
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
369
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
370

×
371
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
372
        controller := &Controller{
×
373
                config:             config,
×
374
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
375
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
376
                ipam:               ovnipam.NewIPAM(),
×
377
                namedPort:          NewNamedPort(),
×
378

×
379
                vpcsLister:           vpcInformer.Lister(),
×
380
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
381
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
382
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
383
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
384
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
385

×
386
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
387
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
388
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
389
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
390
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
391
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
392
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
393
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
394
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
395
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
396
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
397

×
398
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
399
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
400
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
401
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
402
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
403

×
404
                subnetsLister:           subnetInformer.Lister(),
×
405
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
406
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
407
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
408
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
409
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
410
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
411

×
412
                ippoolLister:            ippoolInformer.Lister(),
×
413
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
414
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
415
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
416
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
417
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
418

×
419
                ipsLister:     ipInformer.Lister(),
×
420
                ipSynced:      ipInformer.Informer().HasSynced,
×
421
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
422
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
423
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
424

×
425
                virtualIpsLister:          virtualIPInformer.Lister(),
×
426
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
427
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
428
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
429
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
430
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
431

×
432
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
433
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
434
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
435
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
436
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
437
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
438

×
439
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
440
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
441
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
442
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
443
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
444

×
445
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
446
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
447
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
448
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
449
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
450

×
451
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
452
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
453
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
454
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
455
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
456

×
457
                vlansLister:     vlanInformer.Lister(),
×
458
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
459
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
460
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
461
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
462
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
463

×
464
                providerNetworksLister: providerNetworkInformer.Lister(),
×
465
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
466

×
467
                podsLister:          podInformer.Lister(),
×
468
                podsSynced:          podInformer.Informer().HasSynced,
×
469
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
470
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
471
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
472
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
473
                                Name:          "DeletePod",
×
474
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
475
                        },
×
476
                ),
×
477
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
478
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
479

×
480
                namespacesLister:  namespaceInformer.Lister(),
×
481
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
482
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
483
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
484

×
485
                nodesLister:     nodeInformer.Lister(),
×
486
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
487
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
488
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
489
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
490
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
491

×
492
                servicesLister:     serviceInformer.Lister(),
×
493
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
494
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
495
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
496
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
497
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
498

×
499
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
500
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
501
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
502
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
503

×
504
                deploymentsLister: deploymentInformer.Lister(),
×
505
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
506

×
507
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
508
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
509
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
510
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
511
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
512

×
513
                configMapsLister: configMapInformer.Lister(),
×
514
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
515

×
516
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
517
                sgsLister:          sgInformer.Lister(),
×
518
                sgSynced:           sgInformer.Informer().HasSynced,
×
519
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
520
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
521
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
522

×
523
                ovnEipsLister:     ovnEipInformer.Lister(),
×
524
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
525
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
526
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
527
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
528
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
529

×
530
                ovnFipsLister:     ovnFipInformer.Lister(),
×
531
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
532
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
533
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
534
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
535

×
536
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
537
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
538
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
539
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
540
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
541

×
542
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
543
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
544
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
545
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
546
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
547

×
548
                csrLister:           csrInformer.Lister(),
×
549
                csrSynced:           csrInformer.Informer().HasSynced,
×
550
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
551

×
552
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
NEW
553
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
554
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
555

×
556
                recorder:               recorder,
×
557
                informerFactory:        informerFactory,
×
558
                cmInformerFactory:      cmInformerFactory,
×
559
                deployInformerFactory:  deployInformerFactory,
×
560
                kubeovnInformerFactory: kubeovnInformerFactory,
×
561
                anpInformerFactory:     anpInformerFactory,
×
562
        }
×
563

×
564
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
565
                config.OvnNbAddr,
×
566
                config.OvnTimeout,
×
567
                config.OvsDbConnectTimeout,
×
568
                config.OvsDbInactivityTimeout,
×
569
                config.OvsDbConnectMaxRetry,
×
570
        ); err != nil {
×
571
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
572
        }
×
573
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
574
                config.OvnSbAddr,
×
575
                config.OvnTimeout,
×
576
                config.OvsDbConnectTimeout,
×
577
                config.OvsDbInactivityTimeout,
×
578
                config.OvsDbConnectMaxRetry,
×
579
        ); err != nil {
×
580
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
581
        }
×
582
        if config.EnableLb {
×
583
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
584
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
585
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
586
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
587
                        "DeleteSwitchLBRule",
×
588
                        workqueue.NewTypedMaxOfRateLimiter(
×
589
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
590
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
591
                        ),
×
592
                )
×
593
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
594
                        "UpdateSwitchLBRule",
×
595
                        workqueue.NewTypedMaxOfRateLimiter(
×
596
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
597
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
598
                        ),
×
599
                )
×
600

×
601
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
602
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
603
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
604
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
605
        }
×
606

607
        if config.EnableNP {
×
608
                controller.npsLister = npInformer.Lister()
×
609
                controller.npsSynced = npInformer.Informer().HasSynced
×
610
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
611
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
612
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
613
        }
×
614

615
        if config.EnableANP {
×
616
                controller.anpsLister = anpInformer.Lister()
×
617
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
618
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
619
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
620
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
621
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
622

×
623
                controller.banpsLister = banpInformer.Lister()
×
624
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
625
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
626
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
627
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
628
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
629
        }
×
630

631
        defer controller.shutdown()
×
632
        klog.Info("Starting OVN controller")
×
633

×
634
        // Wait for the caches to be synced before starting workers
×
635
        controller.informerFactory.Start(ctx.Done())
×
636
        controller.cmInformerFactory.Start(ctx.Done())
×
637
        controller.deployInformerFactory.Start(ctx.Done())
×
638
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
639
        controller.anpInformerFactory.Start(ctx.Done())
×
NEW
640
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
641

×
642
        klog.Info("Waiting for informer caches to sync")
×
643
        cacheSyncs := []cache.InformerSynced{
×
644
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
645
                controller.vpcSynced, controller.subnetSynced,
×
646
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
647
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
648
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
649
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
650
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
651
                controller.ovnDnatRuleSynced,
×
652
        }
×
653
        if controller.config.EnableLb {
×
654
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
655
        }
×
656
        if controller.config.EnableNP {
×
657
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
658
        }
×
659
        if controller.config.EnableANP {
×
660
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
661
        }
×
662

663
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
664
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
665
        }
×
666

667
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
668
                AddFunc:    controller.enqueueAddPod,
×
669
                DeleteFunc: controller.enqueueDeletePod,
×
670
                UpdateFunc: controller.enqueueUpdatePod,
×
671
        }); err != nil {
×
672
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
673
        }
×
674

675
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
676
                AddFunc:    controller.enqueueAddNamespace,
×
677
                UpdateFunc: controller.enqueueUpdateNamespace,
×
678
                DeleteFunc: controller.enqueueDeleteNamespace,
×
679
        }); err != nil {
×
680
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
681
        }
×
682

683
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
684
                AddFunc:    controller.enqueueAddNode,
×
685
                UpdateFunc: controller.enqueueUpdateNode,
×
686
                DeleteFunc: controller.enqueueDeleteNode,
×
687
        }); err != nil {
×
688
                util.LogFatalAndExit(err, "failed to add node event handler")
×
689
        }
×
690

691
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
692
                AddFunc:    controller.enqueueAddService,
×
693
                DeleteFunc: controller.enqueueDeleteService,
×
694
                UpdateFunc: controller.enqueueUpdateService,
×
695
        }); err != nil {
×
696
                util.LogFatalAndExit(err, "failed to add service event handler")
×
697
        }
×
698

699
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
700
                AddFunc:    controller.enqueueAddEndpointSlice,
×
701
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
702
        }); err != nil {
×
703
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
704
        }
×
705

706
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
707
                AddFunc:    controller.enqueueAddDeployment,
×
708
                UpdateFunc: controller.enqueueUpdateDeployment,
×
709
        }); err != nil {
×
710
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
711
        }
×
712

713
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
714
                AddFunc:    controller.enqueueAddVpc,
×
715
                UpdateFunc: controller.enqueueUpdateVpc,
×
716
                DeleteFunc: controller.enqueueDelVpc,
×
717
        }); err != nil {
×
718
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
719
        }
×
720

721
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
722
                AddFunc:    controller.enqueueAddVpcNatGw,
×
723
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
724
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
725
        }); err != nil {
×
726
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
727
        }
×
728

729
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
730
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
731
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
732
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
733
        }); err != nil {
×
734
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
735
        }
×
736

737
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
738
                AddFunc:    controller.enqueueAddSubnet,
×
739
                UpdateFunc: controller.enqueueUpdateSubnet,
×
740
                DeleteFunc: controller.enqueueDeleteSubnet,
×
741
        }); err != nil {
×
742
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
743
        }
×
744

745
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
746
                AddFunc:    controller.enqueueAddIPPool,
×
747
                UpdateFunc: controller.enqueueUpdateIPPool,
×
748
                DeleteFunc: controller.enqueueDeleteIPPool,
×
749
        }); err != nil {
×
750
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
751
        }
×
752

753
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
754
                AddFunc:    controller.enqueueAddIP,
×
755
                UpdateFunc: controller.enqueueUpdateIP,
×
756
                DeleteFunc: controller.enqueueDelIP,
×
757
        }); err != nil {
×
758
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
759
        }
×
760

761
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
762
                AddFunc:    controller.enqueueAddVlan,
×
763
                DeleteFunc: controller.enqueueDelVlan,
×
764
                UpdateFunc: controller.enqueueUpdateVlan,
×
765
        }); err != nil {
×
766
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
767
        }
×
768

769
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
770
                AddFunc:    controller.enqueueAddSg,
×
771
                DeleteFunc: controller.enqueueDeleteSg,
×
772
                UpdateFunc: controller.enqueueUpdateSg,
×
773
        }); err != nil {
×
774
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
775
        }
×
776

777
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
778
                AddFunc:    controller.enqueueAddVirtualIP,
×
779
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
780
                DeleteFunc: controller.enqueueDelVirtualIP,
×
781
        }); err != nil {
×
782
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
783
        }
×
784

785
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
786
                AddFunc:    controller.enqueueAddIptablesEip,
×
787
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
788
                DeleteFunc: controller.enqueueDelIptablesEip,
×
789
        }); err != nil {
×
790
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
791
        }
×
792

793
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
794
                AddFunc:    controller.enqueueAddIptablesFip,
×
795
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
796
                DeleteFunc: controller.enqueueDelIptablesFip,
×
797
        }); err != nil {
×
798
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
799
        }
×
800

801
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
802
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
803
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
804
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
805
        }); err != nil {
×
806
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
807
        }
×
808

809
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
810
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
811
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
812
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
813
        }); err != nil {
×
814
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
815
        }
×
816

817
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
818
                AddFunc:    controller.enqueueAddOvnEip,
×
819
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
820
                DeleteFunc: controller.enqueueDelOvnEip,
×
821
        }); err != nil {
×
822
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
823
        }
×
824

825
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
826
                AddFunc:    controller.enqueueAddOvnFip,
×
827
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
828
                DeleteFunc: controller.enqueueDelOvnFip,
×
829
        }); err != nil {
×
830
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
831
        }
×
832

833
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
834
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
835
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
836
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
837
        }); err != nil {
×
838
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
839
        }
×
840

841
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
842
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
843
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
844
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
845
        }); err != nil {
×
846
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
847
        }
×
848

849
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
850
                AddFunc:    controller.enqueueAddQoSPolicy,
×
851
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
852
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
853
        }); err != nil {
×
854
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
855
        }
×
856

857
        if config.EnableLb {
×
858
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
859
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
860
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
861
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
862
                }); err != nil {
×
863
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
864
                }
×
865

866
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
867
                        AddFunc:    controller.enqueueAddVpcDNS,
×
868
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
869
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
870
                }); err != nil {
×
871
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
872
                }
×
873
        }
874

875
        if config.EnableNP {
×
876
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
877
                        AddFunc:    controller.enqueueAddNp,
×
878
                        UpdateFunc: controller.enqueueUpdateNp,
×
879
                        DeleteFunc: controller.enqueueDeleteNp,
×
880
                }); err != nil {
×
881
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
882
                }
×
883
        }
884

885
        if config.EnableANP {
×
886
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
887
                        AddFunc:    controller.enqueueAddAnp,
×
888
                        UpdateFunc: controller.enqueueUpdateAnp,
×
889
                        DeleteFunc: controller.enqueueDeleteAnp,
×
890
                }); err != nil {
×
891
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
892
                }
×
893

894
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
895
                        AddFunc:    controller.enqueueAddBanp,
×
896
                        UpdateFunc: controller.enqueueUpdateBanp,
×
897
                        DeleteFunc: controller.enqueueDeleteBanp,
×
898
                }); err != nil {
×
899
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
900
                }
×
901

902
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
903
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
904
        }
905

906
        if config.EnableOVNIPSec {
×
907
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
908
                        AddFunc:    controller.enqueueAddCsr,
×
909
                        UpdateFunc: controller.enqueueUpdateCsr,
×
910
                        // no need to add delete func for csr
×
911
                }); err != nil {
×
912
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
913
                }
×
914
        }
915

916
        controller.Run(ctx)
×
917
}
918

919
// Run will set up the event handlers for types we are interested in, as well
920
// as syncing informer caches and starting workers. It will block until stopCh
921
// is closed, at which point it will shutdown the workqueue and wait for
922
// workers to finish processing their current work items.
923
func (c *Controller) Run(ctx context.Context) {
×
924
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
925
        // Otherwise, the init process should be placed after all workers have already started working
×
926
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
927
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
928
        }
×
929

930
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
931
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
932
        }
×
933

934
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
935
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
936
        }
×
937

938
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
939
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
940
        }
×
941

942
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
943
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
944
        }
×
945

946
        if err := c.InitOVN(); err != nil {
×
947
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
948
        }
×
949

950
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
951
        if err := c.syncIPCR(); err != nil {
×
952
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
953
        }
×
954

955
        if err := c.syncFinalizers(); err != nil {
×
956
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
957
        }
×
958

959
        if err := c.InitIPAM(); err != nil {
×
960
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
961
        }
×
962

963
        if err := c.syncNodeRoutes(); err != nil {
×
964
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
965
        }
×
966

967
        if err := c.syncSubnetCR(); err != nil {
×
968
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
969
        }
×
970

971
        if err := c.syncVlanCR(); err != nil {
×
972
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
973
        }
×
974

975
        if c.config.EnableOVNIPSec {
×
976
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
977
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
978
                }
×
979
        }
980

981
        // start workers to do all the network operations
982
        c.startWorkers(ctx)
×
983

×
984
        c.initResourceOnce()
×
985
        <-ctx.Done()
×
986
        klog.Info("Shutting down workers")
×
987
}
988

989
func (c *Controller) dbStatus() {
×
990
        const maxFailures = 5
×
991

×
992
        done := make(chan error, 2)
×
993
        go func() {
×
994
                done <- c.OVNNbClient.Echo(context.Background())
×
995
        }()
×
996
        go func() {
×
997
                done <- c.OVNSbClient.Echo(context.Background())
×
998
        }()
×
999

1000
        resultsReceived := 0
×
1001
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1002

×
1003
        for resultsReceived < 2 {
×
1004
                select {
×
1005
                case err := <-done:
×
1006
                        resultsReceived++
×
1007
                        if err != nil {
×
1008
                                c.dbFailureCount++
×
1009
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1010
                                if c.dbFailureCount >= maxFailures {
×
1011
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1012
                                }
×
1013
                                return
×
1014
                        }
1015
                case <-timeout:
×
1016
                        c.dbFailureCount++
×
1017
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1018
                        if c.dbFailureCount >= maxFailures {
×
1019
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1020
                        }
×
1021
                        return
×
1022
                }
1023
        }
1024

1025
        if c.dbFailureCount > 0 {
×
1026
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1027
                c.dbFailureCount = 0
×
1028
        }
×
1029
}
1030

1031
func (c *Controller) shutdown() {
×
1032
        utilruntime.HandleCrash()
×
1033

×
1034
        c.addOrUpdatePodQueue.ShutDown()
×
1035
        c.deletePodQueue.ShutDown()
×
1036
        c.updatePodSecurityQueue.ShutDown()
×
1037

×
1038
        c.addNamespaceQueue.ShutDown()
×
1039

×
1040
        c.addOrUpdateSubnetQueue.ShutDown()
×
1041
        c.deleteSubnetQueue.ShutDown()
×
1042
        c.updateSubnetStatusQueue.ShutDown()
×
1043
        c.syncVirtualPortsQueue.ShutDown()
×
1044

×
1045
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1046
        c.updateIPPoolStatusQueue.ShutDown()
×
1047
        c.deleteIPPoolQueue.ShutDown()
×
1048

×
1049
        c.addNodeQueue.ShutDown()
×
1050
        c.updateNodeQueue.ShutDown()
×
1051
        c.deleteNodeQueue.ShutDown()
×
1052

×
1053
        c.addServiceQueue.ShutDown()
×
1054
        c.deleteServiceQueue.ShutDown()
×
1055
        c.updateServiceQueue.ShutDown()
×
1056
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1057

×
1058
        c.addVlanQueue.ShutDown()
×
1059
        c.delVlanQueue.ShutDown()
×
1060
        c.updateVlanQueue.ShutDown()
×
1061

×
1062
        c.addOrUpdateVpcQueue.ShutDown()
×
1063
        c.updateVpcStatusQueue.ShutDown()
×
1064
        c.delVpcQueue.ShutDown()
×
1065

×
1066
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1067
        c.initVpcNatGatewayQueue.ShutDown()
×
1068
        c.delVpcNatGatewayQueue.ShutDown()
×
1069
        c.updateVpcEipQueue.ShutDown()
×
1070
        c.updateVpcFloatingIPQueue.ShutDown()
×
1071
        c.updateVpcDnatQueue.ShutDown()
×
1072
        c.updateVpcSnatQueue.ShutDown()
×
1073
        c.updateVpcSubnetQueue.ShutDown()
×
1074

×
1075
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1076
        c.delVpcEgressGatewayQueue.ShutDown()
×
1077

×
1078
        if c.config.EnableLb {
×
1079
                c.addSwitchLBRuleQueue.ShutDown()
×
1080
                c.delSwitchLBRuleQueue.ShutDown()
×
1081
                c.updateSwitchLBRuleQueue.ShutDown()
×
1082

×
1083
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1084
                c.delVpcDNSQueue.ShutDown()
×
1085
        }
×
1086

1087
        c.addIPQueue.ShutDown()
×
1088
        c.updateIPQueue.ShutDown()
×
1089
        c.delIPQueue.ShutDown()
×
1090

×
1091
        c.addVirtualIPQueue.ShutDown()
×
1092
        c.updateVirtualIPQueue.ShutDown()
×
1093
        c.updateVirtualParentsQueue.ShutDown()
×
1094
        c.delVirtualIPQueue.ShutDown()
×
1095

×
1096
        c.addIptablesEipQueue.ShutDown()
×
1097
        c.updateIptablesEipQueue.ShutDown()
×
1098
        c.resetIptablesEipQueue.ShutDown()
×
1099
        c.delIptablesEipQueue.ShutDown()
×
1100

×
1101
        c.addIptablesFipQueue.ShutDown()
×
1102
        c.updateIptablesFipQueue.ShutDown()
×
1103
        c.delIptablesFipQueue.ShutDown()
×
1104

×
1105
        c.addIptablesDnatRuleQueue.ShutDown()
×
1106
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1107
        c.delIptablesDnatRuleQueue.ShutDown()
×
1108

×
1109
        c.addIptablesSnatRuleQueue.ShutDown()
×
1110
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1111
        c.delIptablesSnatRuleQueue.ShutDown()
×
1112

×
1113
        c.addQoSPolicyQueue.ShutDown()
×
1114
        c.updateQoSPolicyQueue.ShutDown()
×
1115
        c.delQoSPolicyQueue.ShutDown()
×
1116

×
1117
        c.addOvnEipQueue.ShutDown()
×
1118
        c.updateOvnEipQueue.ShutDown()
×
1119
        c.resetOvnEipQueue.ShutDown()
×
1120
        c.delOvnEipQueue.ShutDown()
×
1121

×
1122
        c.addOvnFipQueue.ShutDown()
×
1123
        c.updateOvnFipQueue.ShutDown()
×
1124
        c.delOvnFipQueue.ShutDown()
×
1125

×
1126
        c.addOvnSnatRuleQueue.ShutDown()
×
1127
        c.updateOvnSnatRuleQueue.ShutDown()
×
1128
        c.delOvnSnatRuleQueue.ShutDown()
×
1129

×
1130
        c.addOvnDnatRuleQueue.ShutDown()
×
1131
        c.updateOvnDnatRuleQueue.ShutDown()
×
1132
        c.delOvnDnatRuleQueue.ShutDown()
×
1133

×
1134
        if c.config.EnableNP {
×
1135
                c.updateNpQueue.ShutDown()
×
1136
                c.deleteNpQueue.ShutDown()
×
1137
        }
×
1138
        if c.config.EnableANP {
×
1139
                c.addAnpQueue.ShutDown()
×
1140
                c.updateAnpQueue.ShutDown()
×
1141
                c.deleteAnpQueue.ShutDown()
×
1142

×
1143
                c.addBanpQueue.ShutDown()
×
1144
                c.updateBanpQueue.ShutDown()
×
1145
                c.deleteBanpQueue.ShutDown()
×
1146
        }
×
1147

1148
        c.addOrUpdateSgQueue.ShutDown()
×
1149
        c.delSgQueue.ShutDown()
×
1150
        c.syncSgPortsQueue.ShutDown()
×
1151

×
1152
        c.addOrUpdateCsrQueue.ShutDown()
×
1153

×
1154
        if c.config.EnableLiveMigrationOptimize {
×
1155
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1156
        }
×
1157
}
1158

1159
func (c *Controller) startWorkers(ctx context.Context) {
×
1160
        klog.Info("Starting workers")
×
1161

×
1162
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1163
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1164
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1165

×
1166
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1167
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1168
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1169
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1170
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1171
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1172
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1173
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1174
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1175
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1176
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1177
        // add default and join subnet and wait them ready
×
1178
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1179
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1180
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1181
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1182
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1183
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1184
                klog.Infof("wait for subnets %v ready", subnets)
×
1185

×
1186
                return c.allSubnetReady(subnets...)
×
1187
        })
×
1188
        if err != nil {
×
1189
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1190
        }
×
1191

1192
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1193
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1194
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1195

×
1196
        // run node worker before handle any pods
×
1197
        for range c.config.WorkerNum {
×
1198
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1199
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1200
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1201
        }
×
1202
        for {
×
1203
                ready := true
×
1204
                time.Sleep(3 * time.Second)
×
1205
                nodes, err := c.nodesLister.List(labels.Everything())
×
1206
                if err != nil {
×
1207
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1208
                }
×
1209
                for _, node := range nodes {
×
1210
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1211
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1212
                                ready = false
×
1213
                                break
×
1214
                        }
1215
                }
1216
                if ready {
×
1217
                        break
×
1218
                }
1219
        }
1220

1221
        if c.config.EnableLb {
×
1222
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1223
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1224
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1225

×
1226
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1227
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1228
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1229

×
1230
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1231
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1232
                go wait.Until(func() {
×
1233
                        c.resyncVpcDNSConfig()
×
1234
                }, 5*time.Second, ctx.Done())
×
1235
        }
1236

1237
        for range c.config.WorkerNum {
×
1238
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1239
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1240
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1241

×
1242
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1243
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1244
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1245
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1246
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1247

×
1248
                if c.config.EnableLb {
×
1249
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1250
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1251
                }
×
1252

1253
                if c.config.EnableNP {
×
1254
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1255
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1256
                }
×
1257

1258
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1259
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1260
        }
1261

1262
        if c.config.EnableEipSnat {
×
1263
                go wait.Until(func() {
×
1264
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1265
                        c.resyncExternalGateway()
×
1266
                }, time.Second, ctx.Done())
×
1267

1268
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1269
                c.OVNNbClient.MonitorBFD()
×
1270
        }
1271
        // TODO: we should merge these two vpc nat config into one config and resync them together
1272
        go wait.Until(func() {
×
1273
                c.resyncVpcNatGwConfig()
×
1274
        }, time.Second, ctx.Done())
×
1275

1276
        go wait.Until(func() {
×
1277
                c.resyncVpcNatConfig()
×
1278
        }, time.Second, ctx.Done())
×
1279

1280
        if c.config.GCInterval != 0 {
×
1281
                go wait.Until(func() {
×
1282
                        if err := c.markAndCleanLSP(); err != nil {
×
1283
                                klog.Errorf("gc lsp error: %v", err)
×
1284
                        }
×
1285
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1286
        }
1287

1288
        go wait.Until(func() {
×
1289
                if err := c.inspectPod(); err != nil {
×
1290
                        klog.Errorf("inspection error: %v", err)
×
1291
                }
×
1292
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1293

1294
        if c.config.EnableExternalVpc {
×
1295
                go wait.Until(func() {
×
1296
                        c.syncExternalVpc()
×
1297
                }, 5*time.Second, ctx.Done())
×
1298
        }
1299

1300
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1301
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1302
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1303

×
1304
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1305
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1306
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1307
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1308

×
1309
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1310
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1311
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1312

×
1313
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1314
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1315
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1316

×
1317
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1318
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1319
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1320

×
1321
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1322

×
1323
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1324
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1325
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1326

×
1327
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1330
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1331

×
1332
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1333
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1334
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1335
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1336

×
1337
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1338
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1339
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1340

×
1341
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1342
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1343
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1344

×
1345
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1346
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1347
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1348

×
1349
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1350
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1351
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1352

×
1353
        if c.config.EnableANP {
×
1354
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1355
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1356
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1357

×
1358
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1359
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1360
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1361
        }
×
1362

1363
        if c.config.EnableLiveMigrationOptimize {
×
1364
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1365
        }
×
1366

NEW
1367
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
NEW
1368

×
UNCOV
1369
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1370
}
1371

1372
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1373
        for _, lsName := range subnets {
2✔
1374
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1375
                if err != nil {
1✔
1376
                        klog.Error(err)
×
1377
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1378
                }
×
1379

1380
                if !exist {
2✔
1381
                        return false, nil
1✔
1382
                }
1✔
1383
        }
1384

1385
        return true, nil
1✔
1386
}
1387

1388
func (c *Controller) initResourceOnce() {
×
1389
        c.registerSubnetMetrics()
×
1390

×
1391
        if err := c.initNodeChassis(); err != nil {
×
1392
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1393
        }
×
1394

1395
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1396
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1397
        }
×
1398
        if err := c.syncSecurityGroup(); err != nil {
×
1399
                util.LogFatalAndExit(err, "failed to sync security group")
×
1400
        }
×
1401

1402
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1403
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1404
        }
×
1405

1406
        if err := c.initVpcNatGw(); err != nil {
×
1407
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1408
        }
×
1409
        if c.config.EnableLb {
×
1410
                if err := c.initVpcDNSConfig(); err != nil {
×
1411
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1412
                }
×
1413
        }
1414

1415
        // remove resources in ovndb that not exist any more in kubernetes resources
1416
        // process gc at last in case of affecting other init process
1417
        if err := c.gc(); err != nil {
×
1418
                util.LogFatalAndExit(err, "failed to run gc")
×
1419
        }
×
1420
}
1421

1422
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1423
        item, shutdown := queue.Get()
×
1424
        if shutdown {
×
1425
                return false
×
1426
        }
×
1427

1428
        err := func(item T) error {
×
1429
                defer queue.Done(item)
×
1430
                if err := handler(item); err != nil {
×
1431
                        queue.AddRateLimited(item)
×
1432
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1433
                }
×
1434
                queue.Forget(item)
×
1435
                return nil
×
1436
        }(item)
1437
        if err != nil {
×
1438
                utilruntime.HandleError(err)
×
1439
                return true
×
1440
        }
×
1441
        return true
×
1442
}
1443

1444
func getWorkItemKey(obj any) string {
×
1445
        switch v := obj.(type) {
×
1446
        case string:
×
1447
                return v
×
1448
        case *vpcService:
×
1449
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1450
        case *AdminNetworkPolicyChangedDelta:
×
1451
                return v.key
×
1452
        case *SlrInfo:
×
1453
                return v.Name
×
1454
        default:
×
1455
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1456
                if err != nil {
×
1457
                        utilruntime.HandleError(err)
×
1458
                        return ""
×
1459
                }
×
1460
                return key
×
1461
        }
1462
}
1463

1464
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1465
        return func() {
×
1466
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1467
                }
×
1468
        }
1469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc