• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 13276279838

05 Dec 2024 10:10AM UTC coverage: 22.382% (+0.06%) from 22.323%
13276279838

Pull #4768

github

hackerain
delete legacy acls when upgrading to v1.13.x (#4742)

the acls in v1.13.x are in tier 2 rather than tier 0 in v1.12.x, the legacy acls
may cause some unexpected behaviors because acls in tier 0 have the higest priority.
we should delete legacy acls and recreate them when upgrading to v1.13.x.

Signed-off-by: suo <yugsuo@gmail.com>
Pull Request #4768: delete legacy acls when upgrading to v1.13.x (#4742)

47 of 137 new or added lines in 9 files covered. (34.31%)

14 existing lines in 5 files now uncovered.

10429 of 46596 relevant lines covered (22.38%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.25
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        "github.com/puzpuzpuz/xsync/v3"
11
        "golang.org/x/time/rate"
12
        corev1 "k8s.io/api/core/v1"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        kubeinformers "k8s.io/client-go/informers"
18
        "k8s.io/client-go/kubernetes/scheme"
19
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
20
        appsv1 "k8s.io/client-go/listers/apps/v1"
21
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
22
        v1 "k8s.io/client-go/listers/core/v1"
23
        netv1 "k8s.io/client-go/listers/networking/v1"
24
        "k8s.io/client-go/tools/cache"
25
        "k8s.io/client-go/tools/record"
26
        "k8s.io/client-go/util/workqueue"
27
        "k8s.io/klog/v2"
28
        "k8s.io/utils/keymutex"
29
        kubevirtController "kubevirt.io/kubevirt/pkg/controller"
30
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
31
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
32
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
33

34
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
35
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
36
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
37
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
38
        "github.com/kubeovn/kube-ovn/pkg/ovs"
39
        "github.com/kubeovn/kube-ovn/pkg/util"
40
)
41

42
const controllerAgentName = "kube-ovn-controller"
43

44
const (
45
        logicalSwitchKey              = "ls"
46
        logicalRouterKey              = "lr"
47
        portGroupKey                  = "pg"
48
        networkPolicyKey              = "np"
49
        sgKey                         = "sg"
50
        associatedSgKeyPrefix         = "associated_sg_"
51
        sgsKey                        = "security_groups"
52
        u2oKey                        = "u2o"
53
        adminNetworkPolicyKey         = "anp"
54
        baselineAdminNetworkPolicyKey = "banp"
55
)
56

57
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
58
type Controller struct {
59
        config *Configuration
60

61
        ipam           *ovnipam.IPAM
62
        namedPort      *NamedPort
63
        anpPrioNameMap map[int32]string
64
        anpNamePrioMap map[string]int32
65

66
        OVNNbClient ovs.NbClient
67
        OVNSbClient ovs.SbClient
68

69
        // ExternalGatewayType define external gateway type, centralized
70
        ExternalGatewayType string
71

72
        podsLister             v1.PodLister
73
        podsSynced             cache.InformerSynced
74
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
75
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
76
        deletingPodObjMap      *xsync.MapOf[string, *corev1.Pod]
77
        deletingNodeObjMap     *xsync.MapOf[string, *corev1.Node]
78
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
79
        podKeyMutex            keymutex.KeyMutex
80

81
        vpcsLister           kubeovnlister.VpcLister
82
        vpcSynced            cache.InformerSynced
83
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
84
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
85
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
86
        vpcKeyMutex          keymutex.KeyMutex
87

88
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
89
        vpcNatGatewaySynced           cache.InformerSynced
90
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
91
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
92
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
93
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
94
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
95
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
96
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
97
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
98
        vpcNatGwKeyMutex              keymutex.KeyMutex
99

100
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
101
        vpcEgressGatewaySynced           cache.InformerSynced
102
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
103
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
104
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
105

106
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
107
        switchLBRuleSynced      cache.InformerSynced
108
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
109
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
110
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
111

112
        vpcDNSLister           kubeovnlister.VpcDnsLister
113
        vpcDNSSynced           cache.InformerSynced
114
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
115
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
116

117
        subnetsLister           kubeovnlister.SubnetLister
118
        subnetSynced            cache.InformerSynced
119
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
120
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
121
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
122
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
123
        subnetKeyMutex          keymutex.KeyMutex
124

125
        ippoolLister            kubeovnlister.IPPoolLister
126
        ippoolSynced            cache.InformerSynced
127
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
128
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
129
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
130
        ippoolKeyMutex          keymutex.KeyMutex
131

132
        ipsLister     kubeovnlister.IPLister
133
        ipSynced      cache.InformerSynced
134
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
135
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
136
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
137

138
        virtualIpsLister          kubeovnlister.VipLister
139
        virtualIpsSynced          cache.InformerSynced
140
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
141
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
142
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
143
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
144

145
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
146
        iptablesEipSynced      cache.InformerSynced
147
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
148
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
149
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
150
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
151

152
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
153
        iptablesFipSynced      cache.InformerSynced
154
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
155
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
156
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
157

158
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
159
        iptablesDnatRuleSynced      cache.InformerSynced
160
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
161
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
162
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
163

164
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
165
        iptablesSnatRuleSynced      cache.InformerSynced
166
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
167
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
168
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
169

170
        ovnEipsLister     kubeovnlister.OvnEipLister
171
        ovnEipSynced      cache.InformerSynced
172
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
173
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
174
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
175
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
176

177
        ovnFipsLister     kubeovnlister.OvnFipLister
178
        ovnFipSynced      cache.InformerSynced
179
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
180
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
181
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
182

183
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
184
        ovnSnatRuleSynced      cache.InformerSynced
185
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
186
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
187
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
188

189
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
190
        ovnDnatRuleSynced      cache.InformerSynced
191
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
193
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194

195
        providerNetworksLister kubeovnlister.ProviderNetworkLister
196
        providerNetworkSynced  cache.InformerSynced
197

198
        vlansLister     kubeovnlister.VlanLister
199
        vlanSynced      cache.InformerSynced
200
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
201
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
202
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
203
        vlanKeyMutex    keymutex.KeyMutex
204

205
        namespacesLister  v1.NamespaceLister
206
        namespacesSynced  cache.InformerSynced
207
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
208
        nsKeyMutex        keymutex.KeyMutex
209

210
        nodesLister     v1.NodeLister
211
        nodesSynced     cache.InformerSynced
212
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
213
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
214
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
215
        nodeKeyMutex    keymutex.KeyMutex
216

217
        servicesLister     v1.ServiceLister
218
        serviceSynced      cache.InformerSynced
219
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
220
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
221
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
222
        svcKeyMutex        keymutex.KeyMutex
223

224
        endpointsLister          v1.EndpointsLister
225
        endpointsSynced          cache.InformerSynced
226
        addOrUpdateEndpointQueue workqueue.TypedRateLimitingInterface[string]
227
        epKeyMutex               keymutex.KeyMutex
228

229
        deploymentsLister appsv1.DeploymentLister
230
        deploymentsSynced cache.InformerSynced
231

232
        npsLister     netv1.NetworkPolicyLister
233
        npsSynced     cache.InformerSynced
234
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
235
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
236
        npKeyMutex    keymutex.KeyMutex
237

238
        sgsLister          kubeovnlister.SecurityGroupLister
239
        sgSynced           cache.InformerSynced
240
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
241
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
242
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
243
        sgKeyMutex         keymutex.KeyMutex
244

245
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
246
        qosPolicySynced      cache.InformerSynced
247
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
248
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
249
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
250

251
        configMapsLister v1.ConfigMapLister
252
        configMapsSynced cache.InformerSynced
253

254
        anpsLister     anplister.AdminNetworkPolicyLister
255
        anpsSynced     cache.InformerSynced
256
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
257
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
258
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
259
        anpKeyMutex    keymutex.KeyMutex
260

261
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
262
        banpsSynced     cache.InformerSynced
263
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
264
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
265
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
266
        banpKeyMutex    keymutex.KeyMutex
267

268
        csrLister           certListerv1.CertificateSigningRequestLister
269
        csrSynced           cache.InformerSynced
270
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
271

272
        vmiMigrationSynced           cache.InformerSynced
273
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
274
        kubevirtInformerFactory      kubevirtController.KubeInformerFactory
275
        hasKubevirtVMIMigration      bool
276

277
        recorder               record.EventRecorder
278
        informerFactory        kubeinformers.SharedInformerFactory
279
        cmInformerFactory      kubeinformers.SharedInformerFactory
280
        deployInformerFactory  kubeinformers.SharedInformerFactory
281
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
282
        anpInformerFactory     anpinformer.SharedInformerFactory
283
}
284

1✔
285
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
2✔
286
        if rateLimiter == nil {
1✔
287
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
288
        }
1✔
289
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
290
}
291

292
// Run creates and runs a new ovn controller
×
293
func Run(ctx context.Context, config *Configuration) {
×
294
        klog.V(4).Info("Creating event broadcaster")
×
295
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
296
        eventBroadcaster.StartLogging(klog.Infof)
×
297
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
298
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
299
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
300
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
301
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
302
        )
×
303

×
304
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
305
        if err != nil {
×
306
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
307
        }
308

×
309
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
310
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
311
                        listOption.AllowWatchBookmarks = true
×
312
                }))
×
313
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
314
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
315
                        listOption.AllowWatchBookmarks = true
×
316
                }), kubeinformers.WithNamespace(config.PodNamespace))
317
        // deployment informer used to list/watch vpc egress gateway workloads
×
318
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
319
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
320
                        listOption.AllowWatchBookmarks = true
×
321
                        listOption.LabelSelector = selector.String()
×
322
                }))
×
323
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
324
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
325
                        listOption.AllowWatchBookmarks = true
×
326
                }))
×
327
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
328
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
329
                        listOption.AllowWatchBookmarks = true
×
330
                }))
331

×
332
        kubevirtInformerFactory := kubevirtController.NewKubeInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
333

×
334
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
335
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
336
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
337
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
338
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
339
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
340
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
341
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
342
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
343
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
344
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
345
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
346
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
347
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
348
        podInformer := informerFactory.Core().V1().Pods()
×
349
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
350
        nodeInformer := informerFactory.Core().V1().Nodes()
×
351
        serviceInformer := informerFactory.Core().V1().Services()
×
352
        endpointInformer := informerFactory.Core().V1().Endpoints()
×
353
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
354
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
355
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
356
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
357
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
358
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
359
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
360
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
361
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
362
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
363
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
364
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
365
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
366
        vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()
×
367

×
368
        numKeyLocks := runtime.NumCPU() * 2
×
369
        if numKeyLocks < config.WorkerNum*2 {
×
370
                numKeyLocks = config.WorkerNum * 2
×
371
        }
×
372
        controller := &Controller{
×
373
                config:             config,
×
374
                deletingPodObjMap:  xsync.NewMapOf[string, *corev1.Pod](),
×
375
                deletingNodeObjMap: xsync.NewMapOf[string, *corev1.Node](),
×
376
                ipam:               ovnipam.NewIPAM(),
×
377
                namedPort:          NewNamedPort(),
×
378

×
379
                vpcsLister:           vpcInformer.Lister(),
×
380
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
381
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
382
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
383
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
384
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
385

×
386
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
387
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
388
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
389
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
390
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
391
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
392
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
393
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
394
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
395
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
396
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
397

×
398
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
399
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
400
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
401
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
402
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
403

×
404
                subnetsLister:           subnetInformer.Lister(),
×
405
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
406
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
407
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
408
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
409
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
410
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
411

×
412
                ippoolLister:            ippoolInformer.Lister(),
×
413
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
414
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
415
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
416
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
417
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
418

×
419
                ipsLister:     ipInformer.Lister(),
×
420
                ipSynced:      ipInformer.Informer().HasSynced,
×
421
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
422
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
423
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
424

×
425
                virtualIpsLister:          virtualIPInformer.Lister(),
×
426
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
427
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
428
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
429
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
430
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
431

×
432
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
433
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
434
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
435
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
436
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
437
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
438

×
439
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
440
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
441
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
442
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
443
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
444

×
445
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
446
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
447
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
448
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
449
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
450

×
451
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
452
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
453
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
454
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
455
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
456

×
457
                vlansLister:     vlanInformer.Lister(),
×
458
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
459
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
460
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
461
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
462
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
463

×
464
                providerNetworksLister: providerNetworkInformer.Lister(),
×
465
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
466

×
467
                podsLister:          podInformer.Lister(),
×
468
                podsSynced:          podInformer.Informer().HasSynced,
×
469
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
470
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
471
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
472
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
473
                                Name:          "DeletePod",
×
474
                                DelayingQueue: workqueue.TypedNewDelayingQueue[string](),
×
475
                        },
×
476
                ),
×
477
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
478
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
479

×
480
                namespacesLister:  namespaceInformer.Lister(),
×
481
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
482
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
483
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
484

×
485
                nodesLister:     nodeInformer.Lister(),
×
486
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
487
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
488
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
489
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
490
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
491

×
492
                servicesLister:     serviceInformer.Lister(),
×
493
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
494
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
495
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
496
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
497
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
498

×
499
                endpointsLister:          endpointInformer.Lister(),
×
500
                endpointsSynced:          endpointInformer.Informer().HasSynced,
×
501
                addOrUpdateEndpointQueue: newTypedRateLimitingQueue[string]("UpdateEndpoint", nil),
×
502
                epKeyMutex:               keymutex.NewHashed(numKeyLocks),
×
503

×
504
                deploymentsLister: deploymentInformer.Lister(),
×
505
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
506

×
507
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
508
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
509
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
510
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
511
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
512

×
513
                configMapsLister: configMapInformer.Lister(),
×
514
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
515

×
516
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
517
                sgsLister:          sgInformer.Lister(),
×
518
                sgSynced:           sgInformer.Informer().HasSynced,
×
519
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
520
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
521
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
522

×
523
                ovnEipsLister:     ovnEipInformer.Lister(),
×
524
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
525
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
526
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
527
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
528
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
529

×
530
                ovnFipsLister:     ovnFipInformer.Lister(),
×
531
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
532
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
533
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
534
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
535

×
536
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
537
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
538
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
539
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
540
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
541

×
542
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
543
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
544
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
545
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
546
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
547

×
548
                csrLister:           csrInformer.Lister(),
×
549
                csrSynced:           csrInformer.Informer().HasSynced,
×
550
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
551

×
552
                vmiMigrationSynced:           vmiMigrationInformer.HasSynced,
×
553
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
554
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
555

×
556
                recorder:               recorder,
×
557
                informerFactory:        informerFactory,
×
558
                cmInformerFactory:      cmInformerFactory,
×
559
                deployInformerFactory:  deployInformerFactory,
×
560
                kubeovnInformerFactory: kubeovnInformerFactory,
×
561
                anpInformerFactory:     anpInformerFactory,
×
562
        }
×
563

×
564
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
565
                config.OvnNbAddr,
×
566
                config.OvnTimeout,
×
567
                config.OvsDbConnectTimeout,
×
568
                config.OvsDbInactivityTimeout,
×
569
                config.OvsDbConnectMaxRetry,
×
570
        ); err != nil {
×
571
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
572
        }
×
573
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
574
                config.OvnSbAddr,
×
575
                config.OvnTimeout,
×
576
                config.OvsDbConnectTimeout,
×
577
                config.OvsDbInactivityTimeout,
×
578
                config.OvsDbConnectMaxRetry,
×
579
        ); err != nil {
×
580
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
581
        }
×
582
        if config.EnableLb {
×
583
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
584
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
585
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
586
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
587
                        "DeleteSwitchLBRule",
×
588
                        workqueue.NewTypedMaxOfRateLimiter(
×
589
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
590
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
591
                        ),
×
592
                )
×
593
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
594
                        "UpdateSwitchLBRule",
×
595
                        workqueue.NewTypedMaxOfRateLimiter(
×
596
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
597
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
598
                        ),
×
599
                )
×
600

×
601
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
602
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
603
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
604
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
605
        }
606

×
607
        if config.EnableNP {
×
608
                controller.npsLister = npInformer.Lister()
×
609
                controller.npsSynced = npInformer.Informer().HasSynced
×
610
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
611
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
612
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
613
        }
614

×
615
        if config.EnableANP {
×
616
                controller.anpsLister = anpInformer.Lister()
×
617
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
618
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
619
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
620
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
621
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
622

×
623
                controller.banpsLister = banpInformer.Lister()
×
624
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
625
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
626
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
627
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
628
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
629
        }
630

×
631
        defer controller.shutdown()
×
632
        klog.Info("Starting OVN controller")
×
633

×
634
        // Wait for the caches to be synced before starting workers
×
635
        controller.informerFactory.Start(ctx.Done())
×
636
        controller.cmInformerFactory.Start(ctx.Done())
×
637
        controller.deployInformerFactory.Start(ctx.Done())
×
638
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
639
        controller.anpInformerFactory.Start(ctx.Done())
×
640

×
641
        controller.hasKubevirtVMIMigration = controller.isVMIMigrationCRDInstalled()
×
642
        if controller.config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration {
×
643
                kubevirtInformerFactory.Start(ctx.Done())
×
644
        }
×
645

×
646
        klog.Info("Waiting for informer caches to sync")
×
647
        cacheSyncs := []cache.InformerSynced{
×
648
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
649
                controller.vpcSynced, controller.subnetSynced,
×
650
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
651
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
652
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
653
                controller.serviceSynced, controller.endpointsSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
654
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
655
                controller.ovnDnatRuleSynced,
×
656
        }
×
657
        if controller.config.EnableLb {
×
658
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
659
        }
×
660
        if controller.config.EnableNP {
661
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
662
        }
×
663
        if controller.config.EnableANP {
×
664
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
665
        }
×
666

×
667
        if controller.config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration {
×
668
                cacheSyncs = append(cacheSyncs, controller.vmiMigrationSynced)
×
669
        }
×
670

×
671
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
672
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
673
        }
×
674

×
675
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
676
                AddFunc:    controller.enqueueAddPod,
×
677
                DeleteFunc: controller.enqueueDeletePod,
×
678
                UpdateFunc: controller.enqueueUpdatePod,
×
679
        }); err != nil {
×
680
                util.LogFatalAndExit(err, "failed to add pod event handler")
681
        }
×
682

×
683
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
684
                AddFunc:    controller.enqueueAddNamespace,
×
685
                UpdateFunc: controller.enqueueUpdateNamespace,
×
686
                DeleteFunc: controller.enqueueDeleteNamespace,
×
687
        }); err != nil {
×
688
                util.LogFatalAndExit(err, "failed to add namespace event handler")
689
        }
×
690

×
691
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
692
                AddFunc:    controller.enqueueAddNode,
×
693
                UpdateFunc: controller.enqueueUpdateNode,
×
694
                DeleteFunc: controller.enqueueDeleteNode,
×
695
        }); err != nil {
×
696
                util.LogFatalAndExit(err, "failed to add node event handler")
697
        }
×
698

×
699
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
700
                AddFunc:    controller.enqueueAddService,
×
701
                DeleteFunc: controller.enqueueDeleteService,
×
702
                UpdateFunc: controller.enqueueUpdateService,
×
703
        }); err != nil {
704
                util.LogFatalAndExit(err, "failed to add service event handler")
×
705
        }
×
706

×
707
        if _, err = endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
708
                AddFunc:    controller.enqueueAddEndpoint,
×
709
                UpdateFunc: controller.enqueueUpdateEndpoint,
×
710
        }); err != nil {
711
                util.LogFatalAndExit(err, "failed to add endpoint event handler")
×
712
        }
×
713

×
714
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
715
                AddFunc:    controller.enqueueAddDeployment,
×
716
                UpdateFunc: controller.enqueueUpdateDeployment,
×
717
        }); err != nil {
×
718
                util.LogFatalAndExit(err, "failed to add deployment event handler")
719
        }
×
720

×
721
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
722
                AddFunc:    controller.enqueueAddVpc,
×
723
                UpdateFunc: controller.enqueueUpdateVpc,
×
724
                DeleteFunc: controller.enqueueDelVpc,
×
725
        }); err != nil {
×
726
                util.LogFatalAndExit(err, "failed to add vpc event handler")
727
        }
×
728

×
729
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
730
                AddFunc:    controller.enqueueAddVpcNatGw,
×
731
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
732
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
733
        }); err != nil {
×
734
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
735
        }
×
736

×
737
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
738
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
739
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
740
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
741
        }); err != nil {
×
742
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
743
        }
×
744

×
745
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
746
                AddFunc:    controller.enqueueAddSubnet,
×
747
                UpdateFunc: controller.enqueueUpdateSubnet,
×
748
                DeleteFunc: controller.enqueueDeleteSubnet,
×
749
        }); err != nil {
×
750
                util.LogFatalAndExit(err, "failed to add subnet event handler")
751
        }
×
752

×
753
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
754
                AddFunc:    controller.enqueueAddIPPool,
×
755
                UpdateFunc: controller.enqueueUpdateIPPool,
×
756
                DeleteFunc: controller.enqueueDeleteIPPool,
×
757
        }); err != nil {
×
758
                util.LogFatalAndExit(err, "failed to add ippool event handler")
759
        }
×
760

×
761
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
762
                AddFunc:    controller.enqueueAddIP,
×
763
                UpdateFunc: controller.enqueueUpdateIP,
×
764
                DeleteFunc: controller.enqueueDelIP,
×
765
        }); err != nil {
×
766
                util.LogFatalAndExit(err, "failed to add ips event handler")
767
        }
×
768

×
769
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
770
                AddFunc:    controller.enqueueAddVlan,
×
771
                DeleteFunc: controller.enqueueDelVlan,
×
772
                UpdateFunc: controller.enqueueUpdateVlan,
×
773
        }); err != nil {
×
774
                util.LogFatalAndExit(err, "failed to add vlan event handler")
775
        }
×
776

×
777
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
778
                AddFunc:    controller.enqueueAddSg,
×
779
                DeleteFunc: controller.enqueueDeleteSg,
×
780
                UpdateFunc: controller.enqueueUpdateSg,
×
781
        }); err != nil {
×
782
                util.LogFatalAndExit(err, "failed to add security group event handler")
783
        }
×
784

×
785
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
786
                AddFunc:    controller.enqueueAddVirtualIP,
×
787
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
788
                DeleteFunc: controller.enqueueDelVirtualIP,
×
789
        }); err != nil {
×
790
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
791
        }
×
792

×
793
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
794
                AddFunc:    controller.enqueueAddIptablesEip,
×
795
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
796
                DeleteFunc: controller.enqueueDelIptablesEip,
×
797
        }); err != nil {
×
798
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
799
        }
×
800

×
801
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
802
                AddFunc:    controller.enqueueAddIptablesFip,
×
803
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
804
                DeleteFunc: controller.enqueueDelIptablesFip,
×
805
        }); err != nil {
×
806
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
807
        }
×
808

×
809
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
810
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
811
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
812
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
813
        }); err != nil {
×
814
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
815
        }
×
816

×
817
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
818
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
819
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
820
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
821
        }); err != nil {
×
822
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
823
        }
×
824

×
825
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
826
                AddFunc:    controller.enqueueAddOvnEip,
×
827
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
828
                DeleteFunc: controller.enqueueDelOvnEip,
×
829
        }); err != nil {
×
830
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
831
        }
×
832

×
833
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
834
                AddFunc:    controller.enqueueAddOvnFip,
×
835
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
836
                DeleteFunc: controller.enqueueDelOvnFip,
×
837
        }); err != nil {
×
838
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
839
        }
×
840

×
841
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
842
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
843
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
844
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
845
        }); err != nil {
×
846
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
847
        }
×
848

×
849
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
850
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
851
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
852
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
853
        }); err != nil {
×
854
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
855
        }
×
856

×
857
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
858
                AddFunc:    controller.enqueueAddQoSPolicy,
×
859
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
860
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
861
        }); err != nil {
×
862
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
863
        }
864

×
865
        if config.EnableLb {
×
866
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
867
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
868
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
869
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
870
                }); err != nil {
×
871
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
872
                }
873

×
874
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
875
                        AddFunc:    controller.enqueueAddVpcDNS,
×
876
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
877
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
878
                }); err != nil {
×
879
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
880
                }
×
881
        }
882

883
        if config.EnableNP {
×
884
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
885
                        AddFunc:    controller.enqueueAddNp,
×
886
                        UpdateFunc: controller.enqueueUpdateNp,
×
887
                        DeleteFunc: controller.enqueueDeleteNp,
×
888
                }); err != nil {
×
889
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
890
                }
×
891
        }
892

×
893
        if config.EnableANP {
×
894
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
895
                        AddFunc:    controller.enqueueAddAnp,
×
896
                        UpdateFunc: controller.enqueueUpdateAnp,
×
897
                        DeleteFunc: controller.enqueueDeleteAnp,
×
898
                }); err != nil {
×
899
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
900
                }
×
901

×
902
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
903
                        AddFunc:    controller.enqueueAddBanp,
904
                        UpdateFunc: controller.enqueueUpdateBanp,
×
905
                        DeleteFunc: controller.enqueueDeleteBanp,
×
906
                }); err != nil {
×
907
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
908
                }
×
909

×
910
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
911
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
912
        }
913

914
        if config.EnableOVNIPSec {
×
915
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
916
                        AddFunc:    controller.enqueueAddCsr,
×
917
                        UpdateFunc: controller.enqueueUpdateCsr,
×
918
                        // no need to add delete func for csr
×
919
                }); err != nil {
×
920
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
921
                }
×
922
        }
923

924
        if config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration {
×
925
                if _, err = vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
926
                        AddFunc:    controller.enqueueAddVMIMigration,
927
                        UpdateFunc: controller.enqueueUpdateVMIMigration,
928
                }); err != nil {
929
                        util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
930
                }
931
        }
×
932

×
933
        controller.Run(ctx)
×
934
}
×
935

×
936
// Run will set up the event handlers for types we are interested in, as well
×
937
// as syncing informer caches and starting workers. It will block until stopCh
938
// is closed, at which point it will shutdown the workqueue and wait for
×
939
// workers to finish processing their current work items.
×
940
func (c *Controller) Run(ctx context.Context) {
×
941
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
942
        // Otherwise, the init process should be placed after all workers have already started working
×
943
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
944
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
945
        }
946

×
947
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
948
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
949
        }
950

×
951
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
952
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
953
        }
954

×
955
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
956
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
957
        }
958

959
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
960
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
961
        }
×
962

963
        if err := c.InitOVN(); err != nil {
×
964
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
965
        }
×
966

967
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
×
968
        if err := c.syncIPCR(); err != nil {
×
969
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
970
        }
971

×
972
        if err := c.syncFinalizers(); err != nil {
×
973
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
974
        }
975

×
976
        if err := c.InitIPAM(); err != nil {
×
977
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
978
        }
979

×
980
        if err := c.syncNodeRoutes(); err != nil {
×
981
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
982
        }
983

×
984
        if err := c.syncSubnetCR(); err != nil {
×
985
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
986
        }
×
987

988
        if err := c.syncVlanCR(); err != nil {
UNCOV
989
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
990
        }
×
991

×
992
        if c.config.EnableOVNIPSec {
×
993
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
994
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
UNCOV
995
                }
×
UNCOV
996
        }
×
997

998
        c.handleUpgrading()
NEW
999

×
1000
        // start workers to do all the network operations
×
1001
        c.startWorkers(ctx)
×
1002

×
1003
        c.initResourceOnce()
×
1004
        <-ctx.Done()
×
1005
        klog.Info("Shutting down workers")
×
1006
}
×
1007

×
1008
func (c *Controller) shutdown() {
×
1009
        utilruntime.HandleCrash()
×
1010

×
1011
        c.addOrUpdatePodQueue.ShutDown()
×
1012
        c.deletePodQueue.ShutDown()
×
1013
        c.updatePodSecurityQueue.ShutDown()
×
1014

×
1015
        c.addNamespaceQueue.ShutDown()
×
1016

×
1017
        c.addOrUpdateSubnetQueue.ShutDown()
×
1018
        c.deleteSubnetQueue.ShutDown()
×
1019
        c.updateSubnetStatusQueue.ShutDown()
×
1020
        c.syncVirtualPortsQueue.ShutDown()
×
1021

×
1022
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1023
        c.updateIPPoolStatusQueue.ShutDown()
×
1024
        c.deleteIPPoolQueue.ShutDown()
×
1025

×
1026
        c.addNodeQueue.ShutDown()
×
1027
        c.updateNodeQueue.ShutDown()
×
1028
        c.deleteNodeQueue.ShutDown()
×
1029

×
1030
        c.addServiceQueue.ShutDown()
×
1031
        c.deleteServiceQueue.ShutDown()
×
1032
        c.updateServiceQueue.ShutDown()
×
1033
        c.addOrUpdateEndpointQueue.ShutDown()
×
1034

×
1035
        c.addVlanQueue.ShutDown()
×
1036
        c.delVlanQueue.ShutDown()
×
1037
        c.updateVlanQueue.ShutDown()
×
1038

×
1039
        c.addOrUpdateVpcQueue.ShutDown()
×
1040
        c.updateVpcStatusQueue.ShutDown()
×
1041
        c.delVpcQueue.ShutDown()
×
1042

×
1043
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1044
        c.initVpcNatGatewayQueue.ShutDown()
×
1045
        c.delVpcNatGatewayQueue.ShutDown()
×
1046
        c.updateVpcEipQueue.ShutDown()
×
1047
        c.updateVpcFloatingIPQueue.ShutDown()
×
1048
        c.updateVpcDnatQueue.ShutDown()
×
1049
        c.updateVpcSnatQueue.ShutDown()
×
1050
        c.updateVpcSubnetQueue.ShutDown()
×
1051

×
1052
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1053
        c.delVpcEgressGatewayQueue.ShutDown()
×
1054

1055
        if c.config.EnableLb {
×
1056
                c.addSwitchLBRuleQueue.ShutDown()
×
1057
                c.delSwitchLBRuleQueue.ShutDown()
×
1058
                c.updateSwitchLBRuleQueue.ShutDown()
×
1059

×
1060
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1061
                c.delVpcDNSQueue.ShutDown()
×
1062
        }
×
1063

×
1064
        c.addIPQueue.ShutDown()
×
1065
        c.updateIPQueue.ShutDown()
×
1066
        c.delIPQueue.ShutDown()
×
1067

×
1068
        c.addVirtualIPQueue.ShutDown()
×
1069
        c.updateVirtualIPQueue.ShutDown()
×
1070
        c.updateVirtualParentsQueue.ShutDown()
×
1071
        c.delVirtualIPQueue.ShutDown()
×
1072

×
1073
        c.addIptablesEipQueue.ShutDown()
×
1074
        c.updateIptablesEipQueue.ShutDown()
×
1075
        c.resetIptablesEipQueue.ShutDown()
×
1076
        c.delIptablesEipQueue.ShutDown()
×
1077

×
1078
        c.addIptablesFipQueue.ShutDown()
×
1079
        c.updateIptablesFipQueue.ShutDown()
×
1080
        c.delIptablesFipQueue.ShutDown()
×
1081

×
1082
        c.addIptablesDnatRuleQueue.ShutDown()
×
1083
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1084
        c.delIptablesDnatRuleQueue.ShutDown()
×
1085

×
1086
        c.addIptablesSnatRuleQueue.ShutDown()
×
1087
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1088
        c.delIptablesSnatRuleQueue.ShutDown()
×
1089

×
1090
        c.addQoSPolicyQueue.ShutDown()
×
1091
        c.updateQoSPolicyQueue.ShutDown()
×
1092
        c.delQoSPolicyQueue.ShutDown()
×
1093

×
1094
        c.addOvnEipQueue.ShutDown()
×
1095
        c.updateOvnEipQueue.ShutDown()
×
1096
        c.resetOvnEipQueue.ShutDown()
×
1097
        c.delOvnEipQueue.ShutDown()
×
1098

×
1099
        c.addOvnFipQueue.ShutDown()
×
1100
        c.updateOvnFipQueue.ShutDown()
×
1101
        c.delOvnFipQueue.ShutDown()
×
1102

×
1103
        c.addOvnSnatRuleQueue.ShutDown()
×
1104
        c.updateOvnSnatRuleQueue.ShutDown()
×
1105
        c.delOvnSnatRuleQueue.ShutDown()
×
1106

×
1107
        c.addOvnDnatRuleQueue.ShutDown()
×
1108
        c.updateOvnDnatRuleQueue.ShutDown()
×
1109
        c.delOvnDnatRuleQueue.ShutDown()
×
1110

×
1111
        if c.config.EnableNP {
×
1112
                c.updateNpQueue.ShutDown()
×
1113
                c.deleteNpQueue.ShutDown()
×
1114
        }
×
1115
        if c.config.EnableANP {
1116
                c.addAnpQueue.ShutDown()
×
1117
                c.updateAnpQueue.ShutDown()
×
1118
                c.deleteAnpQueue.ShutDown()
×
1119

×
1120
                c.addBanpQueue.ShutDown()
×
1121
                c.updateBanpQueue.ShutDown()
×
1122
                c.deleteBanpQueue.ShutDown()
×
1123
        }
×
1124

×
1125
        c.addOrUpdateSgQueue.ShutDown()
1126
        c.delSgQueue.ShutDown()
1127
        c.syncSgPortsQueue.ShutDown()
×
1128

×
1129
        c.addOrUpdateCsrQueue.ShutDown()
×
1130

×
1131
        if c.config.EnableLiveMigrationOptimize {
×
1132
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1133
        }
×
1134
}
×
1135

×
NEW
1136
func (c *Controller) handleUpgrading() {
×
NEW
1137
        klog.Info("Start upgrading")
×
NEW
1138

×
NEW
1139
        if err := c.upgradeSecurityGroups(); err != nil {
×
1140
                util.LogFatalAndExit(err, "failed to upgrade security groups")
NEW
1141
        }
×
NEW
1142
        if err := c.upgradeSubnets(); err != nil {
×
NEW
1143
                util.LogFatalAndExit(err, "failed to upgrade subnets")
×
1144
        }
1145
        if c.config.EnableNP {
NEW
1146
                if err := c.upgradeNetworkPolicies(); err != nil {
×
NEW
1147
                        util.LogFatalAndExit(err, "failed to upgrade network policies")
×
NEW
1148
                }
×
NEW
1149
        }
×
NEW
1150
        if err := c.upgradeNodes(); err != nil {
×
NEW
1151
                util.LogFatalAndExit(err, "failed to upgrade nodes")
×
NEW
1152
        }
×
NEW
1153
}
×
NEW
1154

×
1155
func (c *Controller) startWorkers(ctx context.Context) {
×
1156
        klog.Info("Starting workers")
×
1157

×
1158
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1159
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1160
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1161

×
1162
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1163
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1164
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1165
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1166
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1167
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1168
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1169
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1170
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1171
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1172
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1173
        // add default and join subnet and wait them ready
×
1174
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1175
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1176
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1177
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1178
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
1179
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1180
                klog.Infof("wait for subnets %v ready", subnets)
×
1181

×
1182
                return c.allSubnetReady(subnets...)
×
1183
        })
×
1184
        if err != nil {
×
1185
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1186
        }
×
1187

×
1188
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1189
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1190
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1191

×
1192
        // run node worker before handle any pods
×
1193
        for i := 0; i < c.config.WorkerNum; i++ {
×
1194
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1195
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1196
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1197
        }
×
1198
        for {
×
1199
                ready := true
×
1200
                time.Sleep(3 * time.Second)
×
1201
                nodes, err := c.nodesLister.List(labels.Everything())
1202
                if err != nil {
1203
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1204
                }
×
1205
                for _, node := range nodes {
1206
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
1207
                                klog.Infof("wait node %s annotation ready", node.Name)
1208
                                ready = false
×
1209
                                break
×
1210
                        }
×
1211
                }
×
1212
                if ready {
×
1213
                        break
×
1214
                }
×
1215
        }
×
1216

×
1217
        if c.config.EnableLb {
×
1218
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1219
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1220
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1221

×
1222
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
1223
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
1224
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1225

×
1226
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1227
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1228
                go wait.Until(func() {
×
1229
                        c.resyncVpcDNSConfig()
×
1230
                }, 5*time.Second, ctx.Done())
×
1231
        }
×
1232

×
1233
        for i := 0; i < c.config.WorkerNum; i++ {
×
1234
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1235
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1236
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1237

×
1238
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1239
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
1240
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1241
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1242
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1243

×
1244
                if c.config.EnableLb {
1245
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1246
                        go wait.Until(runWorker("add/update endpoint", c.addOrUpdateEndpointQueue, c.handleUpdateEndpoint), time.Second, ctx.Done())
×
1247
                }
1248

1249
                if c.config.EnableNP {
×
1250
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1251
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1252
                }
×
1253

×
1254
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
1255
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
1256
        }
×
1257

1258
        if c.config.EnableEipSnat {
1259
                go wait.Until(func() {
×
1260
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1261
                        c.resyncExternalGateway()
×
1262
                }, time.Second, ctx.Done())
1263

×
1264
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
×
1265
                c.OVNNbClient.MonitorBFD()
×
1266
        }
1267
        // TODO: we should merge these two vpc nat config into one config and resync them together
×
1268
        go wait.Until(func() {
×
1269
                c.resyncVpcNatGwConfig()
×
1270
        }, time.Second, ctx.Done())
×
1271

1272
        go wait.Until(func() {
1273
                c.resyncVpcNatConfig()
×
1274
        }, time.Second, ctx.Done())
×
1275

×
1276
        go wait.Until(func() {
×
1277
                if err := c.markAndCleanLSP(); err != nil {
1278
                        klog.Errorf("gc lsp error: %v", err)
1279
                }
×
1280
        }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
×
1281

×
1282
        go wait.Until(func() {
×
1283
                if err := c.inspectPod(); err != nil {
1284
                        klog.Errorf("inspection error: %v", err)
1285
                }
×
1286
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
×
1287

×
1288
        if c.config.EnableExternalVpc {
×
1289
                go wait.Until(func() {
×
1290
                        c.syncExternalVpc()
×
1291
                }, 5*time.Second, ctx.Done())
×
1292
        }
×
1293

×
1294
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1295
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1296
        go wait.Until(c.CheckGatewayReady, 5*time.Second, ctx.Done())
×
1297

×
1298
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1299
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1300
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1301
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1302

×
1303
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1304
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1305
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1306

×
1307
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1308
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1309
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
1310

×
1311
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1312
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1313
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1314

×
1315
        if c.config.EnableNP {
×
1316
                go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1317
        }
×
1318

×
1319
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1320
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1321
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1322

×
1323
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1324
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1325
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1326
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1327

×
1328
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1330
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1331
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1332

×
1333
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1334
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1335
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1336

×
1337
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1338
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1339
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1340

×
1341
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1342
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1343
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1344

×
1345
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1346
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1347
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1348

×
1349
        if c.config.EnableANP {
1350
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1351
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1352
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1353

1354
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
1355
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
1✔
1356
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
2✔
1357
        }
1✔
1358

1✔
1359
        if c.config.EnableLiveMigrationOptimize && c.hasKubevirtVMIMigration {
×
1360
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1361
        }
×
1362
}
1363

2✔
1364
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1365
        for _, lsName := range subnets {
1✔
1366
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1367
                if err != nil {
1368
                        klog.Error(err)
1✔
1369
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
1370
                }
1371

×
1372
                if !exist {
×
1373
                        return false, nil
×
1374
                }
×
1375
        }
×
1376

×
1377
        return true, nil
1378
}
×
1379

×
1380
func (c *Controller) initResourceOnce() {
×
1381
        c.registerSubnetMetrics()
×
1382

×
1383
        if err := c.initNodeChassis(); err != nil {
×
1384
                util.LogFatalAndExit(err, "failed to initialize node chassis")
1385
        }
×
1386

×
1387
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1388
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
1389
        }
×
1390
        if err := c.syncSecurityGroup(); err != nil {
×
1391
                util.LogFatalAndExit(err, "failed to sync security group")
×
1392
        }
×
1393

×
1394
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1395
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1396
        }
1397

1398
        if err := c.initVpcNatGw(); err != nil {
1399
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
1400
        }
×
1401
        if c.config.EnableLb {
×
1402
                if err := c.initVpcDNSConfig(); err != nil {
×
1403
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
1404
                }
1405
        }
×
1406

×
1407
        // remove resources in ovndb that not exist any more in kubernetes resources
×
1408
        // process gc at last in case of affecting other init process
×
1409
        if err := c.gc(); err != nil {
×
1410
                util.LogFatalAndExit(err, "failed to run gc")
1411
        }
×
1412
}
×
1413

×
1414
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1415
        item, shutdown := queue.Get()
×
1416
        if shutdown {
×
1417
                return false
×
1418
        }
×
1419

1420
        err := func(item T) error {
×
1421
                defer queue.Done(item)
×
1422
                if err := handler(item); err != nil {
×
1423
                        queue.AddRateLimited(item)
×
1424
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1425
                }
1426
                queue.Forget(item)
1427
                return nil
×
1428
        }(item)
×
1429
        if err != nil {
×
1430
                utilruntime.HandleError(err)
×
1431
                return true
×
1432
        }
×
1433
        return true
×
1434
}
×
1435

×
1436
func getWorkItemKey(obj any) string {
×
1437
        switch v := obj.(type) {
×
1438
        case string:
×
1439
                return v
×
1440
        case *vpcService:
×
1441
                return fmt.Sprintf("%s/%s", v.Svc.Namespace, v.Svc.Name)
×
1442
        case *AdminNetworkPolicyChangedDelta:
×
1443
                return v.key
×
1444
        case *SlrInfo:
1445
                return v.Name
1446
        default:
1447
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1448
                if err != nil {
×
1449
                        utilruntime.HandleError(err)
×
1450
                        return ""
×
1451
                }
1452
                return key
1453
        }
1454
}
1455

1456
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
1457
        return func() {
1458
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
1459
                }
1460
        }
1461
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc