• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 14547474456

19 Apr 2025 08:37AM UTC coverage: 21.73% (-0.007%) from 21.737%
14547474456

push

github

web-flow
modernize: simplify code by using modern constructs (#5163)

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

87 of 376 new or added lines in 94 files covered. (23.14%)

1 existing line in 1 file now uncovered.

10251 of 47175 relevant lines covered (21.73%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.28
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        "github.com/puzpuzpuz/xsync/v3"
11
        "golang.org/x/time/rate"
12
        corev1 "k8s.io/api/core/v1"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        kubeinformers "k8s.io/client-go/informers"
18
        "k8s.io/client-go/kubernetes/scheme"
19
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
20
        appsv1 "k8s.io/client-go/listers/apps/v1"
21
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
22
        v1 "k8s.io/client-go/listers/core/v1"
23
        netv1 "k8s.io/client-go/listers/networking/v1"
24
        "k8s.io/client-go/tools/cache"
25
        "k8s.io/client-go/tools/record"
26
        "k8s.io/client-go/util/workqueue"
27
        "k8s.io/klog/v2"
28
        "k8s.io/utils/keymutex"
29
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
30
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
31
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
32

33
        "github.com/kubeovn/kube-ovn/pkg/informer"
34

35
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
36
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
37
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
38
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
39
        "github.com/kubeovn/kube-ovn/pkg/ovs"
40
        "github.com/kubeovn/kube-ovn/pkg/util"
41
)
42

43
const controllerAgentName = "kube-ovn-controller"
44

45
const (
46
        logicalSwitchKey              = "ls"
47
        logicalRouterKey              = "lr"
48
        portGroupKey                  = "pg"
49
        networkPolicyKey              = "np"
50
        sgKey                         = "sg"
51
        associatedSgKeyPrefix         = "associated_sg_"
52
        sgsKey                        = "security_groups"
53
        u2oKey                        = "u2o"
54
        adminNetworkPolicyKey         = "anp"
55
        baselineAdminNetworkPolicyKey = "banp"
56
)
57

58
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
59
type Controller struct {
60
        config *Configuration
61

62
        ipam           *ovnipam.IPAM
63
        namedPort      *NamedPort
64
        anpPrioNameMap map[int32]string
65
        anpNamePrioMap map[string]int32
66

67
        OVNNbClient ovs.NbClient
68
        OVNSbClient ovs.SbClient
69

70
        // ExternalGatewayType define external gateway type, centralized
71
        ExternalGatewayType string
72

73
        podsLister             v1.PodLister
74
        podsSynced             cache.InformerSynced
75
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
76
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
77
        deletingPodObjMap      *xsync.MapOf[string, *corev1.Pod]
78
        deletingNodeObjMap     *xsync.MapOf[string, *corev1.Node]
79
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
80
        podKeyMutex            keymutex.KeyMutex
81

82
        vpcsLister           kubeovnlister.VpcLister
83
        vpcSynced            cache.InformerSynced
84
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
85
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
86
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
87
        vpcKeyMutex          keymutex.KeyMutex
88

89
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
90
        vpcNatGatewaySynced           cache.InformerSynced
91
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
92
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
93
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
94
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
95
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
96
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
97
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
98
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
99
        vpcNatGwKeyMutex              keymutex.KeyMutex
100

101
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
102
        vpcEgressGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
106

107
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
108
        switchLBRuleSynced      cache.InformerSynced
109
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
110
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
111
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
112

113
        vpcDNSLister           kubeovnlister.VpcDnsLister
114
        vpcDNSSynced           cache.InformerSynced
115
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
116
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
117

118
        subnetsLister           kubeovnlister.SubnetLister
119
        subnetSynced            cache.InformerSynced
120
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
121
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
122
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
123
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
124
        subnetKeyMutex          keymutex.KeyMutex
125

126
        ippoolLister            kubeovnlister.IPPoolLister
127
        ippoolSynced            cache.InformerSynced
128
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
129
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
130
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
131
        ippoolKeyMutex          keymutex.KeyMutex
132

133
        ipsLister     kubeovnlister.IPLister
134
        ipSynced      cache.InformerSynced
135
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
136
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
137
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
138

139
        virtualIpsLister          kubeovnlister.VipLister
140
        virtualIpsSynced          cache.InformerSynced
141
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
142
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
143
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
144
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
145

146
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
147
        iptablesEipSynced      cache.InformerSynced
148
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
149
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
150
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
151
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
152

153
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
154
        iptablesFipSynced      cache.InformerSynced
155
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
156
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
157
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
158

159
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
160
        iptablesDnatRuleSynced      cache.InformerSynced
161
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
162
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
163
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
164

165
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
166
        iptablesSnatRuleSynced      cache.InformerSynced
167
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
168
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
169
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
170

171
        ovnEipsLister     kubeovnlister.OvnEipLister
172
        ovnEipSynced      cache.InformerSynced
173
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
174
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
175
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
176
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
177

178
        ovnFipsLister     kubeovnlister.OvnFipLister
179
        ovnFipSynced      cache.InformerSynced
180
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
181
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
182
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
183

184
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
185
        ovnSnatRuleSynced      cache.InformerSynced
186
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
187
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
188
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
189

190
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
191
        ovnDnatRuleSynced      cache.InformerSynced
192
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
193
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
194
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
195

196
        providerNetworksLister kubeovnlister.ProviderNetworkLister
197
        providerNetworkSynced  cache.InformerSynced
198

199
        vlansLister     kubeovnlister.VlanLister
200
        vlanSynced      cache.InformerSynced
201
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
202
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
203
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
204
        vlanKeyMutex    keymutex.KeyMutex
205

206
        namespacesLister  v1.NamespaceLister
207
        namespacesSynced  cache.InformerSynced
208
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
209
        nsKeyMutex        keymutex.KeyMutex
210

211
        nodesLister     v1.NodeLister
212
        nodesSynced     cache.InformerSynced
213
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
214
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
215
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
216
        nodeKeyMutex    keymutex.KeyMutex
217

218
        servicesLister     v1.ServiceLister
219
        serviceSynced      cache.InformerSynced
220
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
221
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
222
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
223
        svcKeyMutex        keymutex.KeyMutex
224

225
        endpointsLister          v1.EndpointsLister
226
        endpointsSynced          cache.InformerSynced
227
        addOrUpdateEndpointQueue workqueue.TypedRateLimitingInterface[string]
228
        epKeyMutex               keymutex.KeyMutex
229

230
        deploymentsLister appsv1.DeploymentLister
231
        deploymentsSynced cache.InformerSynced
232

233
        npsLister     netv1.NetworkPolicyLister
234
        npsSynced     cache.InformerSynced
235
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
236
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
237
        npKeyMutex    keymutex.KeyMutex
238

239
        sgsLister          kubeovnlister.SecurityGroupLister
240
        sgSynced           cache.InformerSynced
241
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
242
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
243
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
244
        sgKeyMutex         keymutex.KeyMutex
245

246
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
247
        qosPolicySynced      cache.InformerSynced
248
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
249
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
250
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
251

252
        configMapsLister v1.ConfigMapLister
253
        configMapsSynced cache.InformerSynced
254

255
        anpsLister     anplister.AdminNetworkPolicyLister
256
        anpsSynced     cache.InformerSynced
257
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
258
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
259
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
260
        anpKeyMutex    keymutex.KeyMutex
261

262
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
263
        banpsSynced     cache.InformerSynced
264
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
265
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
266
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
267
        banpKeyMutex    keymutex.KeyMutex
268

269
        csrLister           certListerv1.CertificateSigningRequestLister
270
        csrSynced           cache.InformerSynced
271
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
272

273
        vmiMigrationSynced           cache.InformerSynced
274
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
275
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
276

277
        recorder               record.EventRecorder
278
        informerFactory        kubeinformers.SharedInformerFactory
279
        cmInformerFactory      kubeinformers.SharedInformerFactory
280
        deployInformerFactory  kubeinformers.SharedInformerFactory
281
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
282
        anpInformerFactory     anpinformer.SharedInformerFactory
283
}
284

285
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
286
        if rateLimiter == nil {
2✔
287
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
288
        }
1✔
289
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
290
}
291

292
// Run creates and runs a new ovn controller
293
func Run(ctx context.Context, config *Configuration) {
×
294
        klog.V(4).Info("Creating event broadcaster")
×
295
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
296
        eventBroadcaster.StartLogging(klog.Infof)
×
297
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
298
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
299
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
300
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
301
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
302
        )
×
303

×
304
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
305
        if err != nil {
×
306
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
307
        }
×
308

309
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
310
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
311
                        listOption.AllowWatchBookmarks = true
×
312
                }))
×
313
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
314
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
315
                        listOption.AllowWatchBookmarks = true
×
316
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
317
        // deployment informer used to list/watch vpc egress gateway workloads
318
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
319
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
320
                        listOption.AllowWatchBookmarks = true
×
321
                        listOption.LabelSelector = selector.String()
×
322
                }))
×
323
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
324
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
325
                        listOption.AllowWatchBookmarks = true
×
326
                }))
×
327
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
328
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
329
                        listOption.AllowWatchBookmarks = true
×
330
                }))
×
331

332
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
333

×
334
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
335
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
336
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
337
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
338
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
339
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
340
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
341
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
342
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
343
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
344
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
345
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
346
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
347
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
348
        podInformer := informerFactory.Core().V1().Pods()
×
349
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
350
        nodeInformer := informerFactory.Core().V1().Nodes()
×
351
        serviceInformer := informerFactory.Core().V1().Services()
×
352
        endpointInformer := informerFactory.Core().V1().Endpoints()
×
353
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
354
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
355
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
356
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
357
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
358
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
359
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
360
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
361
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
362
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
363
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
364
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
365
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
366
        vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()
×
367

×
NEW
368
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
369
        controller := &Controller{
×
370
                config:             config,
×
371
                deletingPodObjMap:  xsync.NewMapOf[string, *corev1.Pod](),
×
372
                deletingNodeObjMap: xsync.NewMapOf[string, *corev1.Node](),
×
373
                ipam:               ovnipam.NewIPAM(),
×
374
                namedPort:          NewNamedPort(),
×
375

×
376
                vpcsLister:           vpcInformer.Lister(),
×
377
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
378
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
379
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
380
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
381
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
382

×
383
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
384
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
385
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
386
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
387
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
388
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
389
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
390
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
391
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
392
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
393
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
394

×
395
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
396
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
397
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
398
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
399
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
400

×
401
                subnetsLister:           subnetInformer.Lister(),
×
402
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
403
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
404
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
405
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
406
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
407
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
408

×
409
                ippoolLister:            ippoolInformer.Lister(),
×
410
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
411
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
412
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
413
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
414
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
415

×
416
                ipsLister:     ipInformer.Lister(),
×
417
                ipSynced:      ipInformer.Informer().HasSynced,
×
418
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
419
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
420
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
421

×
422
                virtualIpsLister:          virtualIPInformer.Lister(),
×
423
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
424
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
425
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
426
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
427
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
428

×
429
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
430
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
431
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
432
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
433
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
434
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
435

×
436
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
437
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
438
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
439
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
440
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
441

×
442
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
443
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
444
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
445
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
446
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
447

×
448
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
449
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
450
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
451
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
452
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
453

×
454
                vlansLister:     vlanInformer.Lister(),
×
455
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
456
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
457
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
458
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
459
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
460

×
461
                providerNetworksLister: providerNetworkInformer.Lister(),
×
462
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
463

×
464
                podsLister:          podInformer.Lister(),
×
465
                podsSynced:          podInformer.Informer().HasSynced,
×
466
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
467
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
468
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
469
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
470
                                Name:          "DeletePod",
×
471
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
472
                        },
×
473
                ),
×
474
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
475
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
476

×
477
                namespacesLister:  namespaceInformer.Lister(),
×
478
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
479
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
480
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
481

×
482
                nodesLister:     nodeInformer.Lister(),
×
483
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
484
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
485
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
486
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
487
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
488

×
489
                servicesLister:     serviceInformer.Lister(),
×
490
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
491
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
492
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
493
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
494
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
495

×
496
                endpointsLister:          endpointInformer.Lister(),
×
497
                endpointsSynced:          endpointInformer.Informer().HasSynced,
×
498
                addOrUpdateEndpointQueue: newTypedRateLimitingQueue[string]("UpdateEndpoint", nil),
×
499
                epKeyMutex:               keymutex.NewHashed(numKeyLocks),
×
500

×
501
                deploymentsLister: deploymentInformer.Lister(),
×
502
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
503

×
504
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
505
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
506
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
507
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
508
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
509

×
510
                configMapsLister: configMapInformer.Lister(),
×
511
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
512

×
513
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
514
                sgsLister:          sgInformer.Lister(),
×
515
                sgSynced:           sgInformer.Informer().HasSynced,
×
516
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
517
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
518
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
519

×
520
                ovnEipsLister:     ovnEipInformer.Lister(),
×
521
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
522
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
523
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
524
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
525
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
526

×
527
                ovnFipsLister:     ovnFipInformer.Lister(),
×
528
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
529
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
530
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
531
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
532

×
533
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
534
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
535
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
536
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
537
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
538

×
539
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
540
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
541
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
542
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
543
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
544

×
545
                csrLister:           csrInformer.Lister(),
×
546
                csrSynced:           csrInformer.Informer().HasSynced,
×
547
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
548

×
549
                vmiMigrationSynced:           vmiMigrationInformer.HasSynced,
×
550
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
551
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
552

×
553
                recorder:               recorder,
×
554
                informerFactory:        informerFactory,
×
555
                cmInformerFactory:      cmInformerFactory,
×
556
                deployInformerFactory:  deployInformerFactory,
×
557
                kubeovnInformerFactory: kubeovnInformerFactory,
×
558
                anpInformerFactory:     anpInformerFactory,
×
559
        }
×
560

×
561
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
562
                config.OvnNbAddr,
×
563
                config.OvnTimeout,
×
564
                config.OvsDbConnectTimeout,
×
565
                config.OvsDbInactivityTimeout,
×
566
                config.OvsDbConnectMaxRetry,
×
567
        ); err != nil {
×
568
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
569
        }
×
570
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
571
                config.OvnSbAddr,
×
572
                config.OvnTimeout,
×
573
                config.OvsDbConnectTimeout,
×
574
                config.OvsDbInactivityTimeout,
×
575
                config.OvsDbConnectMaxRetry,
×
576
        ); err != nil {
×
577
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
578
        }
×
579
        if config.EnableLb {
×
580
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
581
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
582
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
583
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
584
                        "DeleteSwitchLBRule",
×
585
                        workqueue.NewTypedMaxOfRateLimiter(
×
586
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
587
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
588
                        ),
×
589
                )
×
590
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
591
                        "UpdateSwitchLBRule",
×
592
                        workqueue.NewTypedMaxOfRateLimiter(
×
593
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
594
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
595
                        ),
×
596
                )
×
597

×
598
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
599
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
600
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
601
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
602
        }
×
603

604
        if config.EnableNP {
×
605
                controller.npsLister = npInformer.Lister()
×
606
                controller.npsSynced = npInformer.Informer().HasSynced
×
607
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
608
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
609
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
610
        }
×
611

612
        if config.EnableANP {
×
613
                controller.anpsLister = anpInformer.Lister()
×
614
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
615
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
616
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
617
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
618
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
619

×
620
                controller.banpsLister = banpInformer.Lister()
×
621
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
622
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
623
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
624
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
625
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
626
        }
×
627

628
        defer controller.shutdown()
×
629
        klog.Info("Starting OVN controller")
×
630

×
631
        // Wait for the caches to be synced before starting workers
×
632
        controller.informerFactory.Start(ctx.Done())
×
633
        controller.cmInformerFactory.Start(ctx.Done())
×
634
        controller.deployInformerFactory.Start(ctx.Done())
×
635
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
636
        controller.anpInformerFactory.Start(ctx.Done())
×
637

×
638
        klog.Info("Waiting for informer caches to sync")
×
639
        cacheSyncs := []cache.InformerSynced{
×
640
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
641
                controller.vpcSynced, controller.subnetSynced,
×
642
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
643
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
644
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
645
                controller.serviceSynced, controller.endpointsSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
646
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
647
                controller.ovnDnatRuleSynced,
×
648
        }
×
649
        if controller.config.EnableLb {
×
650
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
651
        }
×
652
        if controller.config.EnableNP {
×
653
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
654
        }
×
655
        if controller.config.EnableANP {
×
656
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
657
        }
×
658

659
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
660
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
661
        }
×
662

663
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
664
                AddFunc:    controller.enqueueAddPod,
×
665
                DeleteFunc: controller.enqueueDeletePod,
×
666
                UpdateFunc: controller.enqueueUpdatePod,
×
667
        }); err != nil {
×
668
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
669
        }
×
670

671
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
672
                AddFunc:    controller.enqueueAddNamespace,
×
673
                UpdateFunc: controller.enqueueUpdateNamespace,
×
674
                DeleteFunc: controller.enqueueDeleteNamespace,
×
675
        }); err != nil {
×
676
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
677
        }
×
678

679
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
680
                AddFunc:    controller.enqueueAddNode,
×
681
                UpdateFunc: controller.enqueueUpdateNode,
×
682
                DeleteFunc: controller.enqueueDeleteNode,
×
683
        }); err != nil {
×
684
                util.LogFatalAndExit(err, "failed to add node event handler")
×
685
        }
×
686

687
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
688
                AddFunc:    controller.enqueueAddService,
×
689
                DeleteFunc: controller.enqueueDeleteService,
×
690
                UpdateFunc: controller.enqueueUpdateService,
×
691
        }); err != nil {
×
692
                util.LogFatalAndExit(err, "failed to add service event handler")
×
693
        }
×
694

695
        if _, err = endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
696
                AddFunc:    controller.enqueueAddEndpoint,
×
697
                UpdateFunc: controller.enqueueUpdateEndpoint,
×
698
        }); err != nil {
×
699
                util.LogFatalAndExit(err, "failed to add endpoint event handler")
×
700
        }
×
701

702
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
703
                AddFunc:    controller.enqueueAddDeployment,
×
704
                UpdateFunc: controller.enqueueUpdateDeployment,
×
705
        }); err != nil {
×
706
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
707
        }
×
708

709
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
710
                AddFunc:    controller.enqueueAddVpc,
×
711
                UpdateFunc: controller.enqueueUpdateVpc,
×
712
                DeleteFunc: controller.enqueueDelVpc,
×
713
        }); err != nil {
×
714
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
715
        }
×
716

717
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
718
                AddFunc:    controller.enqueueAddVpcNatGw,
×
719
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
720
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
721
        }); err != nil {
×
722
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
723
        }
×
724

725
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
726
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
727
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
728
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
729
        }); err != nil {
×
730
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
731
        }
×
732

733
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
734
                AddFunc:    controller.enqueueAddSubnet,
×
735
                UpdateFunc: controller.enqueueUpdateSubnet,
×
736
                DeleteFunc: controller.enqueueDeleteSubnet,
×
737
        }); err != nil {
×
738
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
739
        }
×
740

741
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
742
                AddFunc:    controller.enqueueAddIPPool,
×
743
                UpdateFunc: controller.enqueueUpdateIPPool,
×
744
                DeleteFunc: controller.enqueueDeleteIPPool,
×
745
        }); err != nil {
×
746
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
747
        }
×
748

749
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
750
                AddFunc:    controller.enqueueAddIP,
×
751
                UpdateFunc: controller.enqueueUpdateIP,
×
752
                DeleteFunc: controller.enqueueDelIP,
×
753
        }); err != nil {
×
754
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
755
        }
×
756

757
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
758
                AddFunc:    controller.enqueueAddVlan,
×
759
                DeleteFunc: controller.enqueueDelVlan,
×
760
                UpdateFunc: controller.enqueueUpdateVlan,
×
761
        }); err != nil {
×
762
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
763
        }
×
764

765
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
766
                AddFunc:    controller.enqueueAddSg,
×
767
                DeleteFunc: controller.enqueueDeleteSg,
×
768
                UpdateFunc: controller.enqueueUpdateSg,
×
769
        }); err != nil {
×
770
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
771
        }
×
772

773
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
774
                AddFunc:    controller.enqueueAddVirtualIP,
×
775
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
776
                DeleteFunc: controller.enqueueDelVirtualIP,
×
777
        }); err != nil {
×
778
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
779
        }
×
780

781
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
782
                AddFunc:    controller.enqueueAddIptablesEip,
×
783
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
784
                DeleteFunc: controller.enqueueDelIptablesEip,
×
785
        }); err != nil {
×
786
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
787
        }
×
788

789
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
790
                AddFunc:    controller.enqueueAddIptablesFip,
×
791
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
792
                DeleteFunc: controller.enqueueDelIptablesFip,
×
793
        }); err != nil {
×
794
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
795
        }
×
796

797
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
798
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
799
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
800
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
801
        }); err != nil {
×
802
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
803
        }
×
804

805
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
806
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
807
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
808
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
809
        }); err != nil {
×
810
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
811
        }
×
812

813
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
814
                AddFunc:    controller.enqueueAddOvnEip,
×
815
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
816
                DeleteFunc: controller.enqueueDelOvnEip,
×
817
        }); err != nil {
×
818
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
819
        }
×
820

821
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
822
                AddFunc:    controller.enqueueAddOvnFip,
×
823
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
824
                DeleteFunc: controller.enqueueDelOvnFip,
×
825
        }); err != nil {
×
826
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
827
        }
×
828

829
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
830
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
831
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
832
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
833
        }); err != nil {
×
834
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
835
        }
×
836

837
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
838
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
839
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
840
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
841
        }); err != nil {
×
842
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
843
        }
×
844

845
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
846
                AddFunc:    controller.enqueueAddQoSPolicy,
×
847
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
848
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
849
        }); err != nil {
×
850
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
851
        }
×
852

853
        if config.EnableLb {
×
854
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
855
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
856
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
857
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
858
                }); err != nil {
×
859
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
860
                }
×
861

862
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
863
                        AddFunc:    controller.enqueueAddVpcDNS,
×
864
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
865
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
866
                }); err != nil {
×
867
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
868
                }
×
869
        }
870

871
        if config.EnableNP {
×
872
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
873
                        AddFunc:    controller.enqueueAddNp,
×
874
                        UpdateFunc: controller.enqueueUpdateNp,
×
875
                        DeleteFunc: controller.enqueueDeleteNp,
×
876
                }); err != nil {
×
877
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
878
                }
×
879
        }
880

881
        if config.EnableANP {
×
882
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
883
                        AddFunc:    controller.enqueueAddAnp,
×
884
                        UpdateFunc: controller.enqueueUpdateAnp,
×
885
                        DeleteFunc: controller.enqueueDeleteAnp,
×
886
                }); err != nil {
×
887
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
888
                }
×
889

890
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
891
                        AddFunc:    controller.enqueueAddBanp,
×
892
                        UpdateFunc: controller.enqueueUpdateBanp,
×
893
                        DeleteFunc: controller.enqueueDeleteBanp,
×
894
                }); err != nil {
×
895
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
896
                }
×
897

898
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
899
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
900
        }
901

902
        if config.EnableOVNIPSec {
×
903
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
904
                        AddFunc:    controller.enqueueAddCsr,
×
905
                        UpdateFunc: controller.enqueueUpdateCsr,
×
906
                        // no need to add delete func for csr
×
907
                }); err != nil {
×
908
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
909
                }
×
910
        }
911

912
        if config.EnableLiveMigrationOptimize {
×
913
                if _, err = vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
914
                        AddFunc:    controller.enqueueAddVMIMigration,
×
915
                        UpdateFunc: controller.enqueueUpdateVMIMigration,
×
916
                }); err != nil {
×
917
                        util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
×
918
                }
×
919
                controller.StartMigrationInformerFactory(ctx, kubevirtInformerFactory)
×
920
        }
921

922
        controller.Run(ctx)
×
923
}
924

925
// Run will set up the event handlers for types we are interested in, as well
926
// as syncing informer caches and starting workers. It will block until stopCh
927
// is closed, at which point it will shutdown the workqueue and wait for
928
// workers to finish processing their current work items.
929
func (c *Controller) Run(ctx context.Context) {
×
930
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
931
        // Otherwise, the init process should be placed after all workers have already started working
×
932
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
933
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
934
        }
×
935

936
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
937
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
938
        }
×
939

940
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
941
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
942
        }
×
943

944
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
945
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
946
        }
×
947

948
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
949
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
950
        }
×
951

952
        if err := c.InitOVN(); err != nil {
×
953
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
954
        }
×
955

956
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
957
        if err := c.syncIPCR(); err != nil {
×
958
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
959
        }
×
960

961
        if err := c.syncFinalizers(); err != nil {
×
962
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
963
        }
×
964

965
        if err := c.InitIPAM(); err != nil {
×
966
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
967
        }
×
968

969
        if err := c.syncNodeRoutes(); err != nil {
×
970
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
971
        }
×
972

973
        if err := c.syncSubnetCR(); err != nil {
×
974
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
975
        }
×
976

977
        if err := c.syncVlanCR(); err != nil {
×
978
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
979
        }
×
980

981
        if c.config.EnableOVNIPSec {
×
982
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
983
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
984
                }
×
985
        }
986

987
        // start workers to do all the network operations
988
        c.startWorkers(ctx)
×
989

×
990
        c.initResourceOnce()
×
991
        <-ctx.Done()
×
992
        klog.Info("Shutting down workers")
×
993
}
994

995
func (c *Controller) shutdown() {
×
996
        utilruntime.HandleCrash()
×
997

×
998
        c.addOrUpdatePodQueue.ShutDown()
×
999
        c.deletePodQueue.ShutDown()
×
1000
        c.updatePodSecurityQueue.ShutDown()
×
1001

×
1002
        c.addNamespaceQueue.ShutDown()
×
1003

×
1004
        c.addOrUpdateSubnetQueue.ShutDown()
×
1005
        c.deleteSubnetQueue.ShutDown()
×
1006
        c.updateSubnetStatusQueue.ShutDown()
×
1007
        c.syncVirtualPortsQueue.ShutDown()
×
1008

×
1009
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1010
        c.updateIPPoolStatusQueue.ShutDown()
×
1011
        c.deleteIPPoolQueue.ShutDown()
×
1012

×
1013
        c.addNodeQueue.ShutDown()
×
1014
        c.updateNodeQueue.ShutDown()
×
1015
        c.deleteNodeQueue.ShutDown()
×
1016

×
1017
        c.addServiceQueue.ShutDown()
×
1018
        c.deleteServiceQueue.ShutDown()
×
1019
        c.updateServiceQueue.ShutDown()
×
1020
        c.addOrUpdateEndpointQueue.ShutDown()
×
1021

×
1022
        c.addVlanQueue.ShutDown()
×
1023
        c.delVlanQueue.ShutDown()
×
1024
        c.updateVlanQueue.ShutDown()
×
1025

×
1026
        c.addOrUpdateVpcQueue.ShutDown()
×
1027
        c.updateVpcStatusQueue.ShutDown()
×
1028
        c.delVpcQueue.ShutDown()
×
1029

×
1030
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1031
        c.initVpcNatGatewayQueue.ShutDown()
×
1032
        c.delVpcNatGatewayQueue.ShutDown()
×
1033
        c.updateVpcEipQueue.ShutDown()
×
1034
        c.updateVpcFloatingIPQueue.ShutDown()
×
1035
        c.updateVpcDnatQueue.ShutDown()
×
1036
        c.updateVpcSnatQueue.ShutDown()
×
1037
        c.updateVpcSubnetQueue.ShutDown()
×
1038

×
1039
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1040
        c.delVpcEgressGatewayQueue.ShutDown()
×
1041

×
1042
        if c.config.EnableLb {
×
1043
                c.addSwitchLBRuleQueue.ShutDown()
×
1044
                c.delSwitchLBRuleQueue.ShutDown()
×
1045
                c.updateSwitchLBRuleQueue.ShutDown()
×
1046

×
1047
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1048
                c.delVpcDNSQueue.ShutDown()
×
1049
        }
×
1050

1051
        c.addIPQueue.ShutDown()
×
1052
        c.updateIPQueue.ShutDown()
×
1053
        c.delIPQueue.ShutDown()
×
1054

×
1055
        c.addVirtualIPQueue.ShutDown()
×
1056
        c.updateVirtualIPQueue.ShutDown()
×
1057
        c.updateVirtualParentsQueue.ShutDown()
×
1058
        c.delVirtualIPQueue.ShutDown()
×
1059

×
1060
        c.addIptablesEipQueue.ShutDown()
×
1061
        c.updateIptablesEipQueue.ShutDown()
×
1062
        c.resetIptablesEipQueue.ShutDown()
×
1063
        c.delIptablesEipQueue.ShutDown()
×
1064

×
1065
        c.addIptablesFipQueue.ShutDown()
×
1066
        c.updateIptablesFipQueue.ShutDown()
×
1067
        c.delIptablesFipQueue.ShutDown()
×
1068

×
1069
        c.addIptablesDnatRuleQueue.ShutDown()
×
1070
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1071
        c.delIptablesDnatRuleQueue.ShutDown()
×
1072

×
1073
        c.addIptablesSnatRuleQueue.ShutDown()
×
1074
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1075
        c.delIptablesSnatRuleQueue.ShutDown()
×
1076

×
1077
        c.addQoSPolicyQueue.ShutDown()
×
1078
        c.updateQoSPolicyQueue.ShutDown()
×
1079
        c.delQoSPolicyQueue.ShutDown()
×
1080

×
1081
        c.addOvnEipQueue.ShutDown()
×
1082
        c.updateOvnEipQueue.ShutDown()
×
1083
        c.resetOvnEipQueue.ShutDown()
×
1084
        c.delOvnEipQueue.ShutDown()
×
1085

×
1086
        c.addOvnFipQueue.ShutDown()
×
1087
        c.updateOvnFipQueue.ShutDown()
×
1088
        c.delOvnFipQueue.ShutDown()
×
1089

×
1090
        c.addOvnSnatRuleQueue.ShutDown()
×
1091
        c.updateOvnSnatRuleQueue.ShutDown()
×
1092
        c.delOvnSnatRuleQueue.ShutDown()
×
1093

×
1094
        c.addOvnDnatRuleQueue.ShutDown()
×
1095
        c.updateOvnDnatRuleQueue.ShutDown()
×
1096
        c.delOvnDnatRuleQueue.ShutDown()
×
1097

×
1098
        if c.config.EnableNP {
×
1099
                c.updateNpQueue.ShutDown()
×
1100
                c.deleteNpQueue.ShutDown()
×
1101
        }
×
1102
        if c.config.EnableANP {
×
1103
                c.addAnpQueue.ShutDown()
×
1104
                c.updateAnpQueue.ShutDown()
×
1105
                c.deleteAnpQueue.ShutDown()
×
1106

×
1107
                c.addBanpQueue.ShutDown()
×
1108
                c.updateBanpQueue.ShutDown()
×
1109
                c.deleteBanpQueue.ShutDown()
×
1110
        }
×
1111

1112
        c.addOrUpdateSgQueue.ShutDown()
×
1113
        c.delSgQueue.ShutDown()
×
1114
        c.syncSgPortsQueue.ShutDown()
×
1115

×
1116
        c.addOrUpdateCsrQueue.ShutDown()
×
1117

×
1118
        if c.config.EnableLiveMigrationOptimize {
×
1119
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1120
        }
×
1121
}
1122

1123
func (c *Controller) startWorkers(ctx context.Context) {
×
1124
        klog.Info("Starting workers")
×
1125

×
1126
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1127
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1128
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1129

×
1130
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1131
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1132
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1133
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1134
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1135
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1136
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1137
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1138
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1139
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1140
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1141
        // add default and join subnet and wait them ready
×
1142
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1143
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1144
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1145
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1146
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1147
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1148
                klog.Infof("wait for subnets %v ready", subnets)
×
1149

×
1150
                return c.allSubnetReady(subnets...)
×
1151
        })
×
1152
        if err != nil {
×
1153
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1154
        }
×
1155

1156
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1157
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1158
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1159

×
1160
        // run node worker before handle any pods
×
NEW
1161
        for range c.config.WorkerNum {
×
1162
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1163
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1164
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1165
        }
×
1166
        for {
×
1167
                ready := true
×
1168
                time.Sleep(3 * time.Second)
×
1169
                nodes, err := c.nodesLister.List(labels.Everything())
×
1170
                if err != nil {
×
1171
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1172
                }
×
1173
                for _, node := range nodes {
×
1174
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1175
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1176
                                ready = false
×
1177
                                break
×
1178
                        }
1179
                }
1180
                if ready {
×
1181
                        break
×
1182
                }
1183
        }
1184

1185
        if c.config.EnableLb {
×
1186
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1187
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1188
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1189

×
1190
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1191
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1192
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1193

×
1194
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1195
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1196
                go wait.Until(func() {
×
1197
                        c.resyncVpcDNSConfig()
×
1198
                }, 5*time.Second, ctx.Done())
×
1199
        }
1200

NEW
1201
        for range c.config.WorkerNum {
×
1202
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1203
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1204
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1205

×
1206
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1207
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1208
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1209
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1210
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1211

×
1212
                if c.config.EnableLb {
×
1213
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1214
                        go wait.Until(runWorker("add/update endpoint", c.addOrUpdateEndpointQueue, c.handleUpdateEndpoint), time.Second, ctx.Done())
×
1215
                }
×
1216

1217
                if c.config.EnableNP {
×
1218
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1219
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1220
                }
×
1221

1222
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1223
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1224
        }
1225

1226
        if c.config.EnableEipSnat {
×
1227
                go wait.Until(func() {
×
1228
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1229
                        c.resyncExternalGateway()
×
1230
                }, time.Second, ctx.Done())
×
1231

1232
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1233
                c.OVNNbClient.MonitorBFD()
×
1234
        }
1235
        // TODO: we should merge these two vpc nat config into one config and resync them together
1236
        go wait.Until(func() {
×
1237
                c.resyncVpcNatGwConfig()
×
1238
        }, time.Second, ctx.Done())
×
1239

1240
        go wait.Until(func() {
×
1241
                c.resyncVpcNatConfig()
×
1242
        }, time.Second, ctx.Done())
×
1243

1244
        if c.config.GCInterval != 0 {
×
1245
                go wait.Until(func() {
×
1246
                        if err := c.markAndCleanLSP(); err != nil {
×
1247
                                klog.Errorf("gc lsp error: %v", err)
×
1248
                        }
×
1249
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1250
        }
1251

1252
        go wait.Until(func() {
×
1253
                if err := c.inspectPod(); err != nil {
×
1254
                        klog.Errorf("inspection error: %v", err)
×
1255
                }
×
1256
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1257

1258
        if c.config.EnableExternalVpc {
×
1259
                go wait.Until(func() {
×
1260
                        c.syncExternalVpc()
×
1261
                }, 5*time.Second, ctx.Done())
×
1262
        }
1263

1264
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1265
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1266
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1267

×
1268
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1269
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1270
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1272

×
1273
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1274
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1275
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1276

×
1277
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1278
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1279
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1280

×
1281
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1282
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1283
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1284

×
1285
        if c.config.EnableNP {
×
1286
                go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1287
        }
×
1288

1289
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1290
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1291
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1292

×
1293
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1294
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1295
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1296
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1297

×
1298
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1299
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1300
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1301
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1302

×
1303
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1304
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1305
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1306

×
1307
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1308
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1309
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1310

×
1311
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1312
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1313
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1314

×
1315
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1316
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1317
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1318

×
1319
        if c.config.EnableANP {
×
1320
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1321
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1322
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1323

×
1324
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1325
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1326
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1327
        }
×
1328

1329
        if c.config.EnableLiveMigrationOptimize {
×
1330
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1331
        }
×
1332
}
1333

1334
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1335
        for _, lsName := range subnets {
2✔
1336
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1337
                if err != nil {
1✔
1338
                        klog.Error(err)
×
1339
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1340
                }
×
1341

1342
                if !exist {
2✔
1343
                        return false, nil
1✔
1344
                }
1✔
1345
        }
1346

1347
        return true, nil
1✔
1348
}
1349

1350
func (c *Controller) initResourceOnce() {
×
1351
        c.registerSubnetMetrics()
×
1352

×
1353
        if err := c.initNodeChassis(); err != nil {
×
1354
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1355
        }
×
1356

1357
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1358
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1359
        }
×
1360
        if err := c.syncSecurityGroup(); err != nil {
×
1361
                util.LogFatalAndExit(err, "failed to sync security group")
×
1362
        }
×
1363

1364
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1365
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1366
        }
×
1367

1368
        if err := c.initVpcNatGw(); err != nil {
×
1369
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1370
        }
×
1371
        if c.config.EnableLb {
×
1372
                if err := c.initVpcDNSConfig(); err != nil {
×
1373
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1374
                }
×
1375
        }
1376

1377
        // remove resources in ovndb that not exist any more in kubernetes resources
1378
        // process gc at last in case of affecting other init process
1379
        if err := c.gc(); err != nil {
×
1380
                util.LogFatalAndExit(err, "failed to run gc")
×
1381
        }
×
1382
}
1383

1384
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1385
        item, shutdown := queue.Get()
×
1386
        if shutdown {
×
1387
                return false
×
1388
        }
×
1389

1390
        err := func(item T) error {
×
1391
                defer queue.Done(item)
×
1392
                if err := handler(item); err != nil {
×
1393
                        queue.AddRateLimited(item)
×
1394
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1395
                }
×
1396
                queue.Forget(item)
×
1397
                return nil
×
1398
        }(item)
1399
        if err != nil {
×
1400
                utilruntime.HandleError(err)
×
1401
                return true
×
1402
        }
×
1403
        return true
×
1404
}
1405

1406
func getWorkItemKey(obj any) string {
×
1407
        switch v := obj.(type) {
×
1408
        case string:
×
1409
                return v
×
1410
        case *vpcService:
×
1411
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1412
        case *AdminNetworkPolicyChangedDelta:
×
1413
                return v.key
×
1414
        case *SlrInfo:
×
1415
                return v.Name
×
1416
        default:
×
1417
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1418
                if err != nil {
×
1419
                        utilruntime.HandleError(err)
×
1420
                        return ""
×
1421
                }
×
1422
                return key
×
1423
        }
1424
}
1425

1426
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1427
        return func() {
×
1428
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1429
                }
×
1430
        }
1431
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc