• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 15203338777

23 May 2025 05:52AM UTC coverage: 21.844% (-0.03%) from 21.876%
15203338777

push

github

web-flow
feature: traffic policy (#5263)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

0 of 94 new or added lines in 4 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

10364 of 47446 relevant lines covered (21.84%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.28
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        "github.com/puzpuzpuz/xsync/v3"
11
        "golang.org/x/time/rate"
12
        corev1 "k8s.io/api/core/v1"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        kubeinformers "k8s.io/client-go/informers"
18
        "k8s.io/client-go/kubernetes/scheme"
19
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
20
        appsv1 "k8s.io/client-go/listers/apps/v1"
21
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
22
        v1 "k8s.io/client-go/listers/core/v1"
23
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
24
        netv1 "k8s.io/client-go/listers/networking/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        "k8s.io/klog/v2"
29
        "k8s.io/utils/keymutex"
30
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
31
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
32
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
33

34
        "github.com/kubeovn/kube-ovn/pkg/informer"
35

36
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
37
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
38
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
39
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
40
        "github.com/kubeovn/kube-ovn/pkg/ovs"
41
        "github.com/kubeovn/kube-ovn/pkg/util"
42
)
43

44
const controllerAgentName = "kube-ovn-controller"
45

46
const (
47
        logicalSwitchKey              = "ls"
48
        logicalRouterKey              = "lr"
49
        portGroupKey                  = "pg"
50
        networkPolicyKey              = "np"
51
        sgKey                         = "sg"
52
        associatedSgKeyPrefix         = "associated_sg_"
53
        sgsKey                        = "security_groups"
54
        u2oKey                        = "u2o"
55
        adminNetworkPolicyKey         = "anp"
56
        baselineAdminNetworkPolicyKey = "banp"
57
)
58

59
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
60
type Controller struct {
61
        config *Configuration
62

63
        ipam           *ovnipam.IPAM
64
        namedPort      *NamedPort
65
        anpPrioNameMap map[int32]string
66
        anpNamePrioMap map[string]int32
67

68
        OVNNbClient ovs.NbClient
69
        OVNSbClient ovs.SbClient
70

71
        // ExternalGatewayType define external gateway type, centralized
72
        ExternalGatewayType string
73

74
        podsLister             v1.PodLister
75
        podsSynced             cache.InformerSynced
76
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
77
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
78
        deletingPodObjMap      *xsync.MapOf[string, *corev1.Pod]
79
        deletingNodeObjMap     *xsync.MapOf[string, *corev1.Node]
80
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
81
        podKeyMutex            keymutex.KeyMutex
82

83
        vpcsLister           kubeovnlister.VpcLister
84
        vpcSynced            cache.InformerSynced
85
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
86
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
87
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
88
        vpcKeyMutex          keymutex.KeyMutex
89

90
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
91
        vpcNatGatewaySynced           cache.InformerSynced
92
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
93
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
94
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
95
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
96
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
97
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
98
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
99
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
100
        vpcNatGwKeyMutex              keymutex.KeyMutex
101

102
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
103
        vpcEgressGatewaySynced           cache.InformerSynced
104
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
105
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
106
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
107

108
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
109
        switchLBRuleSynced      cache.InformerSynced
110
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
111
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
112
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
113

114
        vpcDNSLister           kubeovnlister.VpcDnsLister
115
        vpcDNSSynced           cache.InformerSynced
116
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
118

119
        subnetsLister           kubeovnlister.SubnetLister
120
        subnetSynced            cache.InformerSynced
121
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
122
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
123
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
124
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
125
        subnetKeyMutex          keymutex.KeyMutex
126

127
        ippoolLister            kubeovnlister.IPPoolLister
128
        ippoolSynced            cache.InformerSynced
129
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
130
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
131
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
132
        ippoolKeyMutex          keymutex.KeyMutex
133

134
        ipsLister     kubeovnlister.IPLister
135
        ipSynced      cache.InformerSynced
136
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
137
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
138
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
139

140
        virtualIpsLister          kubeovnlister.VipLister
141
        virtualIpsSynced          cache.InformerSynced
142
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
143
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
144
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
145
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
146

147
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
148
        iptablesEipSynced      cache.InformerSynced
149
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
150
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
151
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
152
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
153

154
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
155
        iptablesFipSynced      cache.InformerSynced
156
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
157
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
158
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
159

160
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
161
        iptablesDnatRuleSynced      cache.InformerSynced
162
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
163
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
164
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
165

166
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
167
        iptablesSnatRuleSynced      cache.InformerSynced
168
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
169
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
170
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
171

172
        ovnEipsLister     kubeovnlister.OvnEipLister
173
        ovnEipSynced      cache.InformerSynced
174
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
175
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
176
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
177
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
178

179
        ovnFipsLister     kubeovnlister.OvnFipLister
180
        ovnFipSynced      cache.InformerSynced
181
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
182
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
183
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
184

185
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
186
        ovnSnatRuleSynced      cache.InformerSynced
187
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
188
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
189
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
190

191
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
192
        ovnDnatRuleSynced      cache.InformerSynced
193
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
195
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
196

197
        providerNetworksLister kubeovnlister.ProviderNetworkLister
198
        providerNetworkSynced  cache.InformerSynced
199

200
        vlansLister     kubeovnlister.VlanLister
201
        vlanSynced      cache.InformerSynced
202
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
203
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
204
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
205
        vlanKeyMutex    keymutex.KeyMutex
206

207
        namespacesLister  v1.NamespaceLister
208
        namespacesSynced  cache.InformerSynced
209
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
210
        nsKeyMutex        keymutex.KeyMutex
211

212
        nodesLister     v1.NodeLister
213
        nodesSynced     cache.InformerSynced
214
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
215
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
216
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
217
        nodeKeyMutex    keymutex.KeyMutex
218

219
        servicesLister     v1.ServiceLister
220
        serviceSynced      cache.InformerSynced
221
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
222
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
223
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
224
        svcKeyMutex        keymutex.KeyMutex
225

226
        endpointSlicesLister          discoveryv1.EndpointSliceLister
227
        endpointSlicesSynced          cache.InformerSynced
228
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
229
        epKeyMutex                    keymutex.KeyMutex
230

231
        deploymentsLister appsv1.DeploymentLister
232
        deploymentsSynced cache.InformerSynced
233

234
        npsLister     netv1.NetworkPolicyLister
235
        npsSynced     cache.InformerSynced
236
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
237
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
238
        npKeyMutex    keymutex.KeyMutex
239

240
        sgsLister          kubeovnlister.SecurityGroupLister
241
        sgSynced           cache.InformerSynced
242
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
243
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
244
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
245
        sgKeyMutex         keymutex.KeyMutex
246

247
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
248
        qosPolicySynced      cache.InformerSynced
249
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
250
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
251
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
252

253
        configMapsLister v1.ConfigMapLister
254
        configMapsSynced cache.InformerSynced
255

256
        anpsLister     anplister.AdminNetworkPolicyLister
257
        anpsSynced     cache.InformerSynced
258
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
259
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
260
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
261
        anpKeyMutex    keymutex.KeyMutex
262

263
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
264
        banpsSynced     cache.InformerSynced
265
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
266
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
267
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
268
        banpKeyMutex    keymutex.KeyMutex
269

270
        csrLister           certListerv1.CertificateSigningRequestLister
271
        csrSynced           cache.InformerSynced
272
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
273

274
        vmiMigrationSynced           cache.InformerSynced
275
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
276
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
277

278
        recorder               record.EventRecorder
279
        informerFactory        kubeinformers.SharedInformerFactory
280
        cmInformerFactory      kubeinformers.SharedInformerFactory
281
        deployInformerFactory  kubeinformers.SharedInformerFactory
282
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
283
        anpInformerFactory     anpinformer.SharedInformerFactory
284
}
285

286
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
287
        if rateLimiter == nil {
2✔
288
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
289
        }
1✔
290
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
291
}
292

293
// Run creates and runs a new ovn controller
294
func Run(ctx context.Context, config *Configuration) {
×
295
        klog.V(4).Info("Creating event broadcaster")
×
296
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
297
        eventBroadcaster.StartLogging(klog.Infof)
×
298
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
299
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
300
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
301
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
302
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
303
        )
×
304

×
305
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
306
        if err != nil {
×
307
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
308
        }
×
309

310
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
311
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
312
                        listOption.AllowWatchBookmarks = true
×
313
                }))
×
314
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
315
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
316
                        listOption.AllowWatchBookmarks = true
×
317
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
318
        // deployment informer used to list/watch vpc egress gateway workloads
319
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
320
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
321
                        listOption.AllowWatchBookmarks = true
×
322
                        listOption.LabelSelector = selector.String()
×
323
                }))
×
324
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
325
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
326
                        listOption.AllowWatchBookmarks = true
×
327
                }))
×
328
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
329
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
330
                        listOption.AllowWatchBookmarks = true
×
331
                }))
×
332

333
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
334

×
335
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
336
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
337
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
338
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
339
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
340
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
341
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
342
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
343
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
344
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
345
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
346
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
347
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
348
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
349
        podInformer := informerFactory.Core().V1().Pods()
×
350
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
351
        nodeInformer := informerFactory.Core().V1().Nodes()
×
352
        serviceInformer := informerFactory.Core().V1().Services()
×
353
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
354
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
355
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
356
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
357
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
358
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
359
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
360
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
361
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
362
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
363
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
364
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
365
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
366
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
367
        vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()
×
368

×
369
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
370
        controller := &Controller{
×
371
                config:             config,
×
372
                deletingPodObjMap:  xsync.NewMapOf[string, *corev1.Pod](),
×
373
                deletingNodeObjMap: xsync.NewMapOf[string, *corev1.Node](),
×
374
                ipam:               ovnipam.NewIPAM(),
×
375
                namedPort:          NewNamedPort(),
×
376

×
377
                vpcsLister:           vpcInformer.Lister(),
×
378
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
379
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
380
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
381
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
382
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
383

×
384
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
385
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
386
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
387
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
388
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
389
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
390
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
391
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
392
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
393
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
394
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
395

×
396
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
397
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
398
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
399
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
400
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
401

×
402
                subnetsLister:           subnetInformer.Lister(),
×
403
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
404
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
405
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
406
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
407
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
408
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
409

×
410
                ippoolLister:            ippoolInformer.Lister(),
×
411
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
412
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
413
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
414
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
415
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
416

×
417
                ipsLister:     ipInformer.Lister(),
×
418
                ipSynced:      ipInformer.Informer().HasSynced,
×
419
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
420
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
421
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
422

×
423
                virtualIpsLister:          virtualIPInformer.Lister(),
×
424
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
425
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
426
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
427
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
428
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
429

×
430
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
431
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
432
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
433
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
434
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
435
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
436

×
437
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
438
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
439
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
440
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
441
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
442

×
443
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
444
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
445
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
446
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
447
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
448

×
449
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
450
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
451
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
452
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
453
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
454

×
455
                vlansLister:     vlanInformer.Lister(),
×
456
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
457
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
458
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
459
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
460
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
461

×
462
                providerNetworksLister: providerNetworkInformer.Lister(),
×
463
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
464

×
465
                podsLister:          podInformer.Lister(),
×
466
                podsSynced:          podInformer.Informer().HasSynced,
×
467
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
468
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
469
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
470
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
471
                                Name:          "DeletePod",
×
472
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
473
                        },
×
474
                ),
×
475
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
476
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
477

×
478
                namespacesLister:  namespaceInformer.Lister(),
×
479
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
480
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
481
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
482

×
483
                nodesLister:     nodeInformer.Lister(),
×
484
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
485
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
486
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
487
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
488
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
489

×
490
                servicesLister:     serviceInformer.Lister(),
×
491
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
492
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
493
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
494
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
495
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
496

×
497
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
498
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
499
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
500
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
501

×
502
                deploymentsLister: deploymentInformer.Lister(),
×
503
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
504

×
505
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
506
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
507
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
508
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
509
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
510

×
511
                configMapsLister: configMapInformer.Lister(),
×
512
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
513

×
514
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
515
                sgsLister:          sgInformer.Lister(),
×
516
                sgSynced:           sgInformer.Informer().HasSynced,
×
517
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
518
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
519
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
520

×
521
                ovnEipsLister:     ovnEipInformer.Lister(),
×
522
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
523
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
524
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
525
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
526
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
527

×
528
                ovnFipsLister:     ovnFipInformer.Lister(),
×
529
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
530
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
531
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
532
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
533

×
534
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
535
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
536
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
537
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
538
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
539

×
540
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
541
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
542
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
543
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
544
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
545

×
546
                csrLister:           csrInformer.Lister(),
×
547
                csrSynced:           csrInformer.Informer().HasSynced,
×
548
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
549

×
550
                vmiMigrationSynced:           vmiMigrationInformer.HasSynced,
×
551
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
552
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
553

×
554
                recorder:               recorder,
×
555
                informerFactory:        informerFactory,
×
556
                cmInformerFactory:      cmInformerFactory,
×
557
                deployInformerFactory:  deployInformerFactory,
×
558
                kubeovnInformerFactory: kubeovnInformerFactory,
×
559
                anpInformerFactory:     anpInformerFactory,
×
560
        }
×
561

×
562
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
563
                config.OvnNbAddr,
×
564
                config.OvnTimeout,
×
565
                config.OvsDbConnectTimeout,
×
566
                config.OvsDbInactivityTimeout,
×
567
                config.OvsDbConnectMaxRetry,
×
568
        ); err != nil {
×
569
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
570
        }
×
571
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
572
                config.OvnSbAddr,
×
573
                config.OvnTimeout,
×
574
                config.OvsDbConnectTimeout,
×
575
                config.OvsDbInactivityTimeout,
×
576
                config.OvsDbConnectMaxRetry,
×
577
        ); err != nil {
×
578
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
579
        }
×
580
        if config.EnableLb {
×
581
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
582
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
583
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
584
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
585
                        "DeleteSwitchLBRule",
×
586
                        workqueue.NewTypedMaxOfRateLimiter(
×
587
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
588
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
589
                        ),
×
590
                )
×
591
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
592
                        "UpdateSwitchLBRule",
×
593
                        workqueue.NewTypedMaxOfRateLimiter(
×
594
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
595
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
596
                        ),
×
597
                )
×
598

×
599
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
600
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
601
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
602
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
603
        }
×
604

605
        if config.EnableNP {
×
606
                controller.npsLister = npInformer.Lister()
×
607
                controller.npsSynced = npInformer.Informer().HasSynced
×
608
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
609
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
610
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
611
        }
×
612

613
        if config.EnableANP {
×
614
                controller.anpsLister = anpInformer.Lister()
×
615
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
616
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
617
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
618
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
619
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
620

×
621
                controller.banpsLister = banpInformer.Lister()
×
622
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
623
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
624
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
625
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
626
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
627
        }
×
628

629
        defer controller.shutdown()
×
630
        klog.Info("Starting OVN controller")
×
631

×
632
        // Wait for the caches to be synced before starting workers
×
633
        controller.informerFactory.Start(ctx.Done())
×
634
        controller.cmInformerFactory.Start(ctx.Done())
×
635
        controller.deployInformerFactory.Start(ctx.Done())
×
636
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
637
        controller.anpInformerFactory.Start(ctx.Done())
×
638

×
639
        klog.Info("Waiting for informer caches to sync")
×
640
        cacheSyncs := []cache.InformerSynced{
×
641
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
642
                controller.vpcSynced, controller.subnetSynced,
×
643
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
644
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
645
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
646
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
647
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
648
                controller.ovnDnatRuleSynced,
×
649
        }
×
650
        if controller.config.EnableLb {
×
651
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
652
        }
×
653
        if controller.config.EnableNP {
×
654
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
655
        }
×
656
        if controller.config.EnableANP {
×
657
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
658
        }
×
659

660
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
661
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
662
        }
×
663

664
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
665
                AddFunc:    controller.enqueueAddPod,
×
666
                DeleteFunc: controller.enqueueDeletePod,
×
667
                UpdateFunc: controller.enqueueUpdatePod,
×
668
        }); err != nil {
×
669
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
670
        }
×
671

672
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
673
                AddFunc:    controller.enqueueAddNamespace,
×
674
                UpdateFunc: controller.enqueueUpdateNamespace,
×
675
                DeleteFunc: controller.enqueueDeleteNamespace,
×
676
        }); err != nil {
×
677
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
678
        }
×
679

680
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
681
                AddFunc:    controller.enqueueAddNode,
×
682
                UpdateFunc: controller.enqueueUpdateNode,
×
683
                DeleteFunc: controller.enqueueDeleteNode,
×
684
        }); err != nil {
×
685
                util.LogFatalAndExit(err, "failed to add node event handler")
×
686
        }
×
687

688
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
689
                AddFunc:    controller.enqueueAddService,
×
690
                DeleteFunc: controller.enqueueDeleteService,
×
691
                UpdateFunc: controller.enqueueUpdateService,
×
692
        }); err != nil {
×
693
                util.LogFatalAndExit(err, "failed to add service event handler")
×
694
        }
×
695

696
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
697
                AddFunc:    controller.enqueueAddEndpointSlice,
×
698
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
699
        }); err != nil {
×
700
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
701
        }
×
702

703
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
704
                AddFunc:    controller.enqueueAddDeployment,
×
705
                UpdateFunc: controller.enqueueUpdateDeployment,
×
706
        }); err != nil {
×
707
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
708
        }
×
709

710
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
711
                AddFunc:    controller.enqueueAddVpc,
×
712
                UpdateFunc: controller.enqueueUpdateVpc,
×
713
                DeleteFunc: controller.enqueueDelVpc,
×
714
        }); err != nil {
×
715
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
716
        }
×
717

718
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
719
                AddFunc:    controller.enqueueAddVpcNatGw,
×
720
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
721
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
722
        }); err != nil {
×
723
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
724
        }
×
725

726
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
727
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
728
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
729
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
730
        }); err != nil {
×
731
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
732
        }
×
733

734
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
735
                AddFunc:    controller.enqueueAddSubnet,
×
736
                UpdateFunc: controller.enqueueUpdateSubnet,
×
737
                DeleteFunc: controller.enqueueDeleteSubnet,
×
738
        }); err != nil {
×
739
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
740
        }
×
741

742
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
743
                AddFunc:    controller.enqueueAddIPPool,
×
744
                UpdateFunc: controller.enqueueUpdateIPPool,
×
745
                DeleteFunc: controller.enqueueDeleteIPPool,
×
746
        }); err != nil {
×
747
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
748
        }
×
749

750
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
751
                AddFunc:    controller.enqueueAddIP,
×
752
                UpdateFunc: controller.enqueueUpdateIP,
×
753
                DeleteFunc: controller.enqueueDelIP,
×
754
        }); err != nil {
×
755
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
756
        }
×
757

758
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
759
                AddFunc:    controller.enqueueAddVlan,
×
760
                DeleteFunc: controller.enqueueDelVlan,
×
761
                UpdateFunc: controller.enqueueUpdateVlan,
×
762
        }); err != nil {
×
763
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
764
        }
×
765

766
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
767
                AddFunc:    controller.enqueueAddSg,
×
768
                DeleteFunc: controller.enqueueDeleteSg,
×
769
                UpdateFunc: controller.enqueueUpdateSg,
×
770
        }); err != nil {
×
771
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
772
        }
×
773

774
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
775
                AddFunc:    controller.enqueueAddVirtualIP,
×
776
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
777
                DeleteFunc: controller.enqueueDelVirtualIP,
×
778
        }); err != nil {
×
779
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
780
        }
×
781

782
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
783
                AddFunc:    controller.enqueueAddIptablesEip,
×
784
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
785
                DeleteFunc: controller.enqueueDelIptablesEip,
×
786
        }); err != nil {
×
787
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
788
        }
×
789

790
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
791
                AddFunc:    controller.enqueueAddIptablesFip,
×
792
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
793
                DeleteFunc: controller.enqueueDelIptablesFip,
×
794
        }); err != nil {
×
795
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
796
        }
×
797

798
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
799
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
800
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
801
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
802
        }); err != nil {
×
803
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
804
        }
×
805

806
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
807
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
808
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
809
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
810
        }); err != nil {
×
811
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
812
        }
×
813

814
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
815
                AddFunc:    controller.enqueueAddOvnEip,
×
816
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
817
                DeleteFunc: controller.enqueueDelOvnEip,
×
818
        }); err != nil {
×
819
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
820
        }
×
821

822
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
823
                AddFunc:    controller.enqueueAddOvnFip,
×
824
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
825
                DeleteFunc: controller.enqueueDelOvnFip,
×
826
        }); err != nil {
×
827
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
828
        }
×
829

830
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
831
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
832
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
833
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
834
        }); err != nil {
×
835
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
836
        }
×
837

838
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
839
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
840
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
841
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
842
        }); err != nil {
×
843
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
844
        }
×
845

846
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
847
                AddFunc:    controller.enqueueAddQoSPolicy,
×
848
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
849
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
850
        }); err != nil {
×
851
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
852
        }
×
853

854
        if config.EnableLb {
×
855
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
856
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
857
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
858
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
859
                }); err != nil {
×
860
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
861
                }
×
862

863
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
864
                        AddFunc:    controller.enqueueAddVpcDNS,
×
865
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
866
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
867
                }); err != nil {
×
868
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
869
                }
×
870
        }
871

872
        if config.EnableNP {
×
873
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
874
                        AddFunc:    controller.enqueueAddNp,
×
875
                        UpdateFunc: controller.enqueueUpdateNp,
×
876
                        DeleteFunc: controller.enqueueDeleteNp,
×
877
                }); err != nil {
×
878
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
879
                }
×
880
        }
881

882
        if config.EnableANP {
×
883
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
884
                        AddFunc:    controller.enqueueAddAnp,
×
885
                        UpdateFunc: controller.enqueueUpdateAnp,
×
886
                        DeleteFunc: controller.enqueueDeleteAnp,
×
887
                }); err != nil {
×
888
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
889
                }
×
890

891
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
892
                        AddFunc:    controller.enqueueAddBanp,
×
893
                        UpdateFunc: controller.enqueueUpdateBanp,
×
894
                        DeleteFunc: controller.enqueueDeleteBanp,
×
895
                }); err != nil {
×
896
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
897
                }
×
898

899
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
900
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
901
        }
902

903
        if config.EnableOVNIPSec {
×
904
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                        AddFunc:    controller.enqueueAddCsr,
×
906
                        UpdateFunc: controller.enqueueUpdateCsr,
×
907
                        // no need to add delete func for csr
×
908
                }); err != nil {
×
909
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
910
                }
×
911
        }
912

913
        if config.EnableLiveMigrationOptimize {
×
914
                if _, err = vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
915
                        AddFunc:    controller.enqueueAddVMIMigration,
×
916
                        UpdateFunc: controller.enqueueUpdateVMIMigration,
×
917
                }); err != nil {
×
918
                        util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
×
919
                }
×
920
                controller.StartMigrationInformerFactory(ctx, kubevirtInformerFactory)
×
921
        }
922

923
        controller.Run(ctx)
×
924
}
925

926
// Run will set up the event handlers for types we are interested in, as well
927
// as syncing informer caches and starting workers. It will block until stopCh
928
// is closed, at which point it will shutdown the workqueue and wait for
929
// workers to finish processing their current work items.
930
func (c *Controller) Run(ctx context.Context) {
×
931
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
932
        // Otherwise, the init process should be placed after all workers have already started working
×
933
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
934
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
935
        }
×
936

937
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
938
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
939
        }
×
940

941
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
942
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
943
        }
×
944

945
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
946
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
947
        }
×
948

949
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
950
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
951
        }
×
952

953
        if err := c.InitOVN(); err != nil {
×
954
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
955
        }
×
956

957
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
958
        if err := c.syncIPCR(); err != nil {
×
959
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
960
        }
×
961

962
        if err := c.syncFinalizers(); err != nil {
×
963
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
964
        }
×
965

966
        if err := c.InitIPAM(); err != nil {
×
967
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
968
        }
×
969

970
        if err := c.syncNodeRoutes(); err != nil {
×
971
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
972
        }
×
973

974
        if err := c.syncSubnetCR(); err != nil {
×
975
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
976
        }
×
977

978
        if err := c.syncVlanCR(); err != nil {
×
979
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
980
        }
×
981

982
        if c.config.EnableOVNIPSec {
×
983
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
984
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
985
                }
×
986
        }
987

988
        // start workers to do all the network operations
989
        c.startWorkers(ctx)
×
990

×
991
        c.initResourceOnce()
×
992
        <-ctx.Done()
×
993
        klog.Info("Shutting down workers")
×
994
}
995

996
func (c *Controller) shutdown() {
×
997
        utilruntime.HandleCrash()
×
998

×
999
        c.addOrUpdatePodQueue.ShutDown()
×
1000
        c.deletePodQueue.ShutDown()
×
1001
        c.updatePodSecurityQueue.ShutDown()
×
1002

×
1003
        c.addNamespaceQueue.ShutDown()
×
1004

×
1005
        c.addOrUpdateSubnetQueue.ShutDown()
×
1006
        c.deleteSubnetQueue.ShutDown()
×
1007
        c.updateSubnetStatusQueue.ShutDown()
×
1008
        c.syncVirtualPortsQueue.ShutDown()
×
1009

×
1010
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1011
        c.updateIPPoolStatusQueue.ShutDown()
×
1012
        c.deleteIPPoolQueue.ShutDown()
×
1013

×
1014
        c.addNodeQueue.ShutDown()
×
1015
        c.updateNodeQueue.ShutDown()
×
1016
        c.deleteNodeQueue.ShutDown()
×
1017

×
1018
        c.addServiceQueue.ShutDown()
×
1019
        c.deleteServiceQueue.ShutDown()
×
1020
        c.updateServiceQueue.ShutDown()
×
1021
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1022

×
1023
        c.addVlanQueue.ShutDown()
×
1024
        c.delVlanQueue.ShutDown()
×
1025
        c.updateVlanQueue.ShutDown()
×
1026

×
1027
        c.addOrUpdateVpcQueue.ShutDown()
×
1028
        c.updateVpcStatusQueue.ShutDown()
×
1029
        c.delVpcQueue.ShutDown()
×
1030

×
1031
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1032
        c.initVpcNatGatewayQueue.ShutDown()
×
1033
        c.delVpcNatGatewayQueue.ShutDown()
×
1034
        c.updateVpcEipQueue.ShutDown()
×
1035
        c.updateVpcFloatingIPQueue.ShutDown()
×
1036
        c.updateVpcDnatQueue.ShutDown()
×
1037
        c.updateVpcSnatQueue.ShutDown()
×
1038
        c.updateVpcSubnetQueue.ShutDown()
×
1039

×
1040
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1041
        c.delVpcEgressGatewayQueue.ShutDown()
×
1042

×
1043
        if c.config.EnableLb {
×
1044
                c.addSwitchLBRuleQueue.ShutDown()
×
1045
                c.delSwitchLBRuleQueue.ShutDown()
×
1046
                c.updateSwitchLBRuleQueue.ShutDown()
×
1047

×
1048
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1049
                c.delVpcDNSQueue.ShutDown()
×
1050
        }
×
1051

1052
        c.addIPQueue.ShutDown()
×
1053
        c.updateIPQueue.ShutDown()
×
1054
        c.delIPQueue.ShutDown()
×
1055

×
1056
        c.addVirtualIPQueue.ShutDown()
×
1057
        c.updateVirtualIPQueue.ShutDown()
×
1058
        c.updateVirtualParentsQueue.ShutDown()
×
1059
        c.delVirtualIPQueue.ShutDown()
×
1060

×
1061
        c.addIptablesEipQueue.ShutDown()
×
1062
        c.updateIptablesEipQueue.ShutDown()
×
1063
        c.resetIptablesEipQueue.ShutDown()
×
1064
        c.delIptablesEipQueue.ShutDown()
×
1065

×
1066
        c.addIptablesFipQueue.ShutDown()
×
1067
        c.updateIptablesFipQueue.ShutDown()
×
1068
        c.delIptablesFipQueue.ShutDown()
×
1069

×
1070
        c.addIptablesDnatRuleQueue.ShutDown()
×
1071
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1072
        c.delIptablesDnatRuleQueue.ShutDown()
×
1073

×
1074
        c.addIptablesSnatRuleQueue.ShutDown()
×
1075
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1076
        c.delIptablesSnatRuleQueue.ShutDown()
×
1077

×
1078
        c.addQoSPolicyQueue.ShutDown()
×
1079
        c.updateQoSPolicyQueue.ShutDown()
×
1080
        c.delQoSPolicyQueue.ShutDown()
×
1081

×
1082
        c.addOvnEipQueue.ShutDown()
×
1083
        c.updateOvnEipQueue.ShutDown()
×
1084
        c.resetOvnEipQueue.ShutDown()
×
1085
        c.delOvnEipQueue.ShutDown()
×
1086

×
1087
        c.addOvnFipQueue.ShutDown()
×
1088
        c.updateOvnFipQueue.ShutDown()
×
1089
        c.delOvnFipQueue.ShutDown()
×
1090

×
1091
        c.addOvnSnatRuleQueue.ShutDown()
×
1092
        c.updateOvnSnatRuleQueue.ShutDown()
×
1093
        c.delOvnSnatRuleQueue.ShutDown()
×
1094

×
1095
        c.addOvnDnatRuleQueue.ShutDown()
×
1096
        c.updateOvnDnatRuleQueue.ShutDown()
×
1097
        c.delOvnDnatRuleQueue.ShutDown()
×
1098

×
1099
        if c.config.EnableNP {
×
1100
                c.updateNpQueue.ShutDown()
×
1101
                c.deleteNpQueue.ShutDown()
×
1102
        }
×
1103
        if c.config.EnableANP {
×
1104
                c.addAnpQueue.ShutDown()
×
1105
                c.updateAnpQueue.ShutDown()
×
1106
                c.deleteAnpQueue.ShutDown()
×
1107

×
1108
                c.addBanpQueue.ShutDown()
×
1109
                c.updateBanpQueue.ShutDown()
×
1110
                c.deleteBanpQueue.ShutDown()
×
1111
        }
×
1112

1113
        c.addOrUpdateSgQueue.ShutDown()
×
1114
        c.delSgQueue.ShutDown()
×
1115
        c.syncSgPortsQueue.ShutDown()
×
1116

×
1117
        c.addOrUpdateCsrQueue.ShutDown()
×
1118

×
1119
        if c.config.EnableLiveMigrationOptimize {
×
1120
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1121
        }
×
1122
}
1123

1124
func (c *Controller) startWorkers(ctx context.Context) {
×
1125
        klog.Info("Starting workers")
×
1126

×
1127
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1128
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1129
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1130

×
1131
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1132
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1133
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1134
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1135
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1136
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1137
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1138
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1139
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1140
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1141
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1142
        // add default and join subnet and wait them ready
×
1143
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1144
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1145
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1146
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1147
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1148
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1149
                klog.Infof("wait for subnets %v ready", subnets)
×
1150

×
1151
                return c.allSubnetReady(subnets...)
×
1152
        })
×
1153
        if err != nil {
×
1154
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1155
        }
×
1156

1157
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1158
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1159
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1160

×
1161
        // run node worker before handle any pods
×
1162
        for range c.config.WorkerNum {
×
1163
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1164
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1165
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1166
        }
×
1167
        for {
×
1168
                ready := true
×
1169
                time.Sleep(3 * time.Second)
×
1170
                nodes, err := c.nodesLister.List(labels.Everything())
×
1171
                if err != nil {
×
1172
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1173
                }
×
1174
                for _, node := range nodes {
×
1175
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1176
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1177
                                ready = false
×
1178
                                break
×
1179
                        }
1180
                }
1181
                if ready {
×
1182
                        break
×
1183
                }
1184
        }
1185

1186
        if c.config.EnableLb {
×
1187
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1188
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1189
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1190

×
1191
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1192
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1193
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1194

×
1195
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1196
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1197
                go wait.Until(func() {
×
1198
                        c.resyncVpcDNSConfig()
×
1199
                }, 5*time.Second, ctx.Done())
×
1200
        }
1201

1202
        for range c.config.WorkerNum {
×
1203
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1204
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1205
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1206

×
1207
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1208
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1209
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1210
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1211
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1212

×
1213
                if c.config.EnableLb {
×
1214
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1215
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1216
                }
×
1217

1218
                if c.config.EnableNP {
×
1219
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1220
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1221
                }
×
1222

1223
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1224
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1225
        }
1226

1227
        if c.config.EnableEipSnat {
×
1228
                go wait.Until(func() {
×
1229
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1230
                        c.resyncExternalGateway()
×
1231
                }, time.Second, ctx.Done())
×
1232

1233
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1234
                c.OVNNbClient.MonitorBFD()
×
1235
        }
1236
        // TODO: we should merge these two vpc nat config into one config and resync them together
1237
        go wait.Until(func() {
×
1238
                c.resyncVpcNatGwConfig()
×
1239
        }, time.Second, ctx.Done())
×
1240

1241
        go wait.Until(func() {
×
1242
                c.resyncVpcNatConfig()
×
1243
        }, time.Second, ctx.Done())
×
1244

1245
        if c.config.GCInterval != 0 {
×
1246
                go wait.Until(func() {
×
1247
                        if err := c.markAndCleanLSP(); err != nil {
×
1248
                                klog.Errorf("gc lsp error: %v", err)
×
1249
                        }
×
1250
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1251
        }
1252

1253
        go wait.Until(func() {
×
1254
                if err := c.inspectPod(); err != nil {
×
1255
                        klog.Errorf("inspection error: %v", err)
×
1256
                }
×
1257
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1258

1259
        if c.config.EnableExternalVpc {
×
1260
                go wait.Until(func() {
×
1261
                        c.syncExternalVpc()
×
1262
                }, 5*time.Second, ctx.Done())
×
1263
        }
1264

1265
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1266
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1267
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1268

×
1269
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1270
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1272
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1273

×
1274
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1275
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1276
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1277

×
1278
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1279
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1280
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1281

×
1282
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1283
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1284
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1285

×
NEW
1286
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1287

×
1288
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1289
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1290
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1291

×
1292
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1293
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1294
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1295
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1296

×
1297
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1298
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1299
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1300
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1301

×
1302
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1303
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1304
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1305

×
1306
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1307
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1308
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1309

×
1310
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1311
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1312
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1313

×
1314
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1315
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1316
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1317

×
1318
        if c.config.EnableANP {
×
1319
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1320
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1321
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1322

×
1323
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1324
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1325
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1326
        }
×
1327

1328
        if c.config.EnableLiveMigrationOptimize {
×
1329
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1330
        }
×
1331
}
1332

1333
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1334
        for _, lsName := range subnets {
2✔
1335
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1336
                if err != nil {
1✔
1337
                        klog.Error(err)
×
1338
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1339
                }
×
1340

1341
                if !exist {
2✔
1342
                        return false, nil
1✔
1343
                }
1✔
1344
        }
1345

1346
        return true, nil
1✔
1347
}
1348

1349
func (c *Controller) initResourceOnce() {
×
1350
        c.registerSubnetMetrics()
×
1351

×
1352
        if err := c.initNodeChassis(); err != nil {
×
1353
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1354
        }
×
1355

1356
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1357
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1358
        }
×
1359
        if err := c.syncSecurityGroup(); err != nil {
×
1360
                util.LogFatalAndExit(err, "failed to sync security group")
×
1361
        }
×
1362

1363
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1364
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1365
        }
×
1366

1367
        if err := c.initVpcNatGw(); err != nil {
×
1368
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1369
        }
×
1370
        if c.config.EnableLb {
×
1371
                if err := c.initVpcDNSConfig(); err != nil {
×
1372
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1373
                }
×
1374
        }
1375

1376
        // remove resources in ovndb that not exist any more in kubernetes resources
1377
        // process gc at last in case of affecting other init process
1378
        if err := c.gc(); err != nil {
×
1379
                util.LogFatalAndExit(err, "failed to run gc")
×
1380
        }
×
1381
}
1382

1383
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1384
        item, shutdown := queue.Get()
×
1385
        if shutdown {
×
1386
                return false
×
1387
        }
×
1388

1389
        err := func(item T) error {
×
1390
                defer queue.Done(item)
×
1391
                if err := handler(item); err != nil {
×
1392
                        queue.AddRateLimited(item)
×
1393
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1394
                }
×
1395
                queue.Forget(item)
×
1396
                return nil
×
1397
        }(item)
1398
        if err != nil {
×
1399
                utilruntime.HandleError(err)
×
1400
                return true
×
1401
        }
×
1402
        return true
×
1403
}
1404

1405
func getWorkItemKey(obj any) string {
×
1406
        switch v := obj.(type) {
×
1407
        case string:
×
1408
                return v
×
1409
        case *vpcService:
×
1410
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1411
        case *AdminNetworkPolicyChangedDelta:
×
1412
                return v.key
×
1413
        case *SlrInfo:
×
1414
                return v.Name
×
1415
        default:
×
1416
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1417
                if err != nil {
×
1418
                        utilruntime.HandleError(err)
×
1419
                        return ""
×
1420
                }
×
1421
                return key
×
1422
        }
1423
}
1424

1425
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1426
        return func() {
×
1427
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1428
                }
×
1429
        }
1430
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc