• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 16460593740

23 Jul 2025 03:11AM UTC coverage: 21.539% (-0.006%) from 21.545%
16460593740

push

github

oilbeater
fix concurrent map race (#5510)

* [fix] use map to replace  the informer cache modify

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>

* [fix] change of conditons

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>

* [fix] clean up cached entry

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>

---------

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>
Co-authored-by: zhanglin02 <zhanglin02@kanzhun.com>
(cherry picked from commit 5f150bc48)

0 of 15 new or added lines in 3 files covered. (0.0%)

4 existing lines in 3 files now uncovered.

10504 of 48768 relevant lines covered (21.54%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.24
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        "github.com/puzpuzpuz/xsync/v4"
11
        "golang.org/x/time/rate"
12
        corev1 "k8s.io/api/core/v1"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/apimachinery/pkg/util/wait"
17
        kubeinformers "k8s.io/client-go/informers"
18
        "k8s.io/client-go/kubernetes/scheme"
19
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
20
        appsv1 "k8s.io/client-go/listers/apps/v1"
21
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
22
        v1 "k8s.io/client-go/listers/core/v1"
23
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
24
        netv1 "k8s.io/client-go/listers/networking/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        "k8s.io/klog/v2"
29
        "k8s.io/utils/keymutex"
30
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
31
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
32
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
33

34
        "github.com/kubeovn/kube-ovn/pkg/informer"
35

36
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
37
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
38
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
39
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
40
        "github.com/kubeovn/kube-ovn/pkg/ovs"
41
        "github.com/kubeovn/kube-ovn/pkg/util"
42
)
43

44
const controllerAgentName = "kube-ovn-controller"
45

46
const (
47
        logicalSwitchKey              = "ls"
48
        logicalRouterKey              = "lr"
49
        portGroupKey                  = "pg"
50
        networkPolicyKey              = "np"
51
        sgKey                         = "sg"
52
        associatedSgKeyPrefix         = "associated_sg_"
53
        sgsKey                        = "security_groups"
54
        u2oKey                        = "u2o"
55
        adminNetworkPolicyKey         = "anp"
56
        baselineAdminNetworkPolicyKey = "banp"
57
)
58

59
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
60
type Controller struct {
61
        config *Configuration
62

63
        ipam           *ovnipam.IPAM
64
        namedPort      *NamedPort
65
        anpPrioNameMap map[int32]string
66
        anpNamePrioMap map[string]int32
67

68
        OVNNbClient ovs.NbClient
69
        OVNSbClient ovs.SbClient
70

71
        // ExternalGatewayType define external gateway type, centralized
72
        ExternalGatewayType string
73

74
        podsLister             v1.PodLister
75
        podsSynced             cache.InformerSynced
76
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
77
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
78
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
79
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
80
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
81
        podKeyMutex            keymutex.KeyMutex
82

83
        vpcsLister           kubeovnlister.VpcLister
84
        vpcSynced            cache.InformerSynced
85
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
86
        vpcLastPoliciesMap   *xsync.Map[string, string]
87
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
88
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
89
        vpcKeyMutex          keymutex.KeyMutex
90

91
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
92
        vpcNatGatewaySynced           cache.InformerSynced
93
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
94
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
95
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
96
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
97
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
98
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
99
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
100
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
101
        vpcNatGwKeyMutex              keymutex.KeyMutex
102

103
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
104
        vpcEgressGatewaySynced           cache.InformerSynced
105
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
106
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
107
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
108

109
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
110
        switchLBRuleSynced      cache.InformerSynced
111
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
112
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
113
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
114

115
        vpcDNSLister           kubeovnlister.VpcDnsLister
116
        vpcDNSSynced           cache.InformerSynced
117
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
118
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
119

120
        subnetsLister           kubeovnlister.SubnetLister
121
        subnetSynced            cache.InformerSynced
122
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
123
        subnetLastVpcNameMap    *xsync.Map[string, string]
124
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
125
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
126
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
127
        subnetKeyMutex          keymutex.KeyMutex
128

129
        ippoolLister            kubeovnlister.IPPoolLister
130
        ippoolSynced            cache.InformerSynced
131
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
132
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
133
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
134
        ippoolKeyMutex          keymutex.KeyMutex
135

136
        ipsLister     kubeovnlister.IPLister
137
        ipSynced      cache.InformerSynced
138
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
139
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
140
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
141

142
        virtualIpsLister          kubeovnlister.VipLister
143
        virtualIpsSynced          cache.InformerSynced
144
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
145
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
146
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
147
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
148

149
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
150
        iptablesEipSynced      cache.InformerSynced
151
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
152
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
153
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
154
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
155

156
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
157
        iptablesFipSynced      cache.InformerSynced
158
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
159
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
160
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
161

162
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
163
        iptablesDnatRuleSynced      cache.InformerSynced
164
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
165
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
166
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
167

168
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
169
        iptablesSnatRuleSynced      cache.InformerSynced
170
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
171
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
172
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
173

174
        ovnEipsLister     kubeovnlister.OvnEipLister
175
        ovnEipSynced      cache.InformerSynced
176
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
177
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
178
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
179
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
180

181
        ovnFipsLister     kubeovnlister.OvnFipLister
182
        ovnFipSynced      cache.InformerSynced
183
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
184
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
185
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
186

187
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
188
        ovnSnatRuleSynced      cache.InformerSynced
189
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
190
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
191
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192

193
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
194
        ovnDnatRuleSynced      cache.InformerSynced
195
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
196
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
197
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
198

199
        providerNetworksLister kubeovnlister.ProviderNetworkLister
200
        providerNetworkSynced  cache.InformerSynced
201

202
        vlansLister     kubeovnlister.VlanLister
203
        vlanSynced      cache.InformerSynced
204
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
205
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
206
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
207
        vlanKeyMutex    keymutex.KeyMutex
208

209
        namespacesLister  v1.NamespaceLister
210
        namespacesSynced  cache.InformerSynced
211
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
212
        nsKeyMutex        keymutex.KeyMutex
213

214
        nodesLister     v1.NodeLister
215
        nodesSynced     cache.InformerSynced
216
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
217
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
218
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
219
        nodeKeyMutex    keymutex.KeyMutex
220

221
        servicesLister     v1.ServiceLister
222
        serviceSynced      cache.InformerSynced
223
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
224
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
225
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
226
        svcKeyMutex        keymutex.KeyMutex
227

228
        endpointSlicesLister          discoveryv1.EndpointSliceLister
229
        endpointSlicesSynced          cache.InformerSynced
230
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
231
        epKeyMutex                    keymutex.KeyMutex
232

233
        deploymentsLister appsv1.DeploymentLister
234
        deploymentsSynced cache.InformerSynced
235

236
        npsLister     netv1.NetworkPolicyLister
237
        npsSynced     cache.InformerSynced
238
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
239
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
240
        npKeyMutex    keymutex.KeyMutex
241

242
        sgsLister          kubeovnlister.SecurityGroupLister
243
        sgSynced           cache.InformerSynced
244
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
245
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
246
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
247
        sgKeyMutex         keymutex.KeyMutex
248

249
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
250
        qosPolicySynced      cache.InformerSynced
251
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
252
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
253
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
254

255
        configMapsLister v1.ConfigMapLister
256
        configMapsSynced cache.InformerSynced
257

258
        anpsLister     anplister.AdminNetworkPolicyLister
259
        anpsSynced     cache.InformerSynced
260
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
261
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
262
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
263
        anpKeyMutex    keymutex.KeyMutex
264

265
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
266
        banpsSynced     cache.InformerSynced
267
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
268
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
269
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
270
        banpKeyMutex    keymutex.KeyMutex
271

272
        csrLister           certListerv1.CertificateSigningRequestLister
273
        csrSynced           cache.InformerSynced
274
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
275

276
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
277
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
278
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
279

280
        recorder               record.EventRecorder
281
        informerFactory        kubeinformers.SharedInformerFactory
282
        cmInformerFactory      kubeinformers.SharedInformerFactory
283
        deployInformerFactory  kubeinformers.SharedInformerFactory
284
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
285
        anpInformerFactory     anpinformer.SharedInformerFactory
286

287
        // Database health check
288
        dbFailureCount int
289
}
290

291
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
292
        if rateLimiter == nil {
2✔
293
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
294
        }
1✔
295
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
296
}
297

298
// Run creates and runs a new ovn controller
299
func Run(ctx context.Context, config *Configuration) {
×
300
        klog.V(4).Info("Creating event broadcaster")
×
301
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
302
        eventBroadcaster.StartLogging(klog.Infof)
×
303
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
304
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
305
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
306
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
307
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
308
        )
×
309

×
310
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
311
        if err != nil {
×
312
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
313
        }
×
314

315
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
316
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
317
                        listOption.AllowWatchBookmarks = true
×
318
                }))
×
319
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
320
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
321
                        listOption.AllowWatchBookmarks = true
×
322
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
323
        // deployment informer used to list/watch vpc egress gateway workloads
324
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
325
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
326
                        listOption.AllowWatchBookmarks = true
×
327
                        listOption.LabelSelector = selector.String()
×
328
                }))
×
329
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
330
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
331
                        listOption.AllowWatchBookmarks = true
×
332
                }))
×
333
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
334
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
335
                        listOption.AllowWatchBookmarks = true
×
336
                }))
×
337

338
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
339

×
340
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
341
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
342
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
343
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
344
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
345
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
346
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
347
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
348
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
349
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
350
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
351
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
352
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
353
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
354
        podInformer := informerFactory.Core().V1().Pods()
×
355
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
356
        nodeInformer := informerFactory.Core().V1().Nodes()
×
357
        serviceInformer := informerFactory.Core().V1().Services()
×
358
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
359
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
360
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
361
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
362
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
363
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
364
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
365
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
366
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
367
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
368
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
369
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
370
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
371
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
372

×
373
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
374
        controller := &Controller{
×
375
                config:             config,
×
376
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
377
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
378
                ipam:               ovnipam.NewIPAM(),
×
379
                namedPort:          NewNamedPort(),
×
380

×
381
                vpcsLister:           vpcInformer.Lister(),
×
382
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
383
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
NEW
384
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
385
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
386
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
387
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
388

×
389
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
390
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
391
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
392
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
393
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
394
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
395
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
396
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
397
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
398
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
399
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
400

×
401
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
402
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
403
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
404
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
405
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
406

×
407
                subnetsLister:           subnetInformer.Lister(),
×
408
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
409
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
NEW
410
                subnetLastVpcNameMap:    xsync.NewMap[string, string](),
×
411
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
412
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
413
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
414
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
415

×
416
                ippoolLister:            ippoolInformer.Lister(),
×
417
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
418
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
419
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
420
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
421
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
422

×
423
                ipsLister:     ipInformer.Lister(),
×
424
                ipSynced:      ipInformer.Informer().HasSynced,
×
425
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
426
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
427
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
428

×
429
                virtualIpsLister:          virtualIPInformer.Lister(),
×
430
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
431
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
432
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
433
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
434
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
435

×
436
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
437
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
438
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
439
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
440
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
441
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
442

×
443
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
444
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
445
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
446
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
447
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
448

×
449
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
450
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
451
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
452
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
453
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
454

×
455
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
456
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
457
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
458
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
459
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
460

×
461
                vlansLister:     vlanInformer.Lister(),
×
462
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
463
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
464
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
465
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
466
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
467

×
468
                providerNetworksLister: providerNetworkInformer.Lister(),
×
469
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
470

×
471
                podsLister:          podInformer.Lister(),
×
472
                podsSynced:          podInformer.Informer().HasSynced,
×
473
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
474
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
475
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
476
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
477
                                Name:          "DeletePod",
×
478
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
479
                        },
×
480
                ),
×
481
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
482
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
483

×
484
                namespacesLister:  namespaceInformer.Lister(),
×
485
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
486
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
487
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
488

×
489
                nodesLister:     nodeInformer.Lister(),
×
490
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
491
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
492
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
493
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
494
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
495

×
496
                servicesLister:     serviceInformer.Lister(),
×
497
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
498
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
499
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
500
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
501
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
502

×
503
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
504
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
505
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
506
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
507

×
508
                deploymentsLister: deploymentInformer.Lister(),
×
509
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
510

×
511
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
512
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
513
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
514
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
515
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
516

×
517
                configMapsLister: configMapInformer.Lister(),
×
518
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
519

×
520
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
521
                sgsLister:          sgInformer.Lister(),
×
522
                sgSynced:           sgInformer.Informer().HasSynced,
×
523
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
524
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
525
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
526

×
527
                ovnEipsLister:     ovnEipInformer.Lister(),
×
528
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
529
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
530
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
531
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
532
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
533

×
534
                ovnFipsLister:     ovnFipInformer.Lister(),
×
535
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
536
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
537
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
538
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
539

×
540
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
541
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
542
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
543
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
544
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
545

×
546
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
547
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
548
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
549
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
550
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
551

×
552
                csrLister:           csrInformer.Lister(),
×
553
                csrSynced:           csrInformer.Informer().HasSynced,
×
554
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
555

×
556
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
557
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
558
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
559

×
560
                recorder:               recorder,
×
561
                informerFactory:        informerFactory,
×
562
                cmInformerFactory:      cmInformerFactory,
×
563
                deployInformerFactory:  deployInformerFactory,
×
564
                kubeovnInformerFactory: kubeovnInformerFactory,
×
565
                anpInformerFactory:     anpInformerFactory,
×
566
        }
×
567

×
568
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
569
                config.OvnNbAddr,
×
570
                config.OvnTimeout,
×
571
                config.OvsDbConnectTimeout,
×
572
                config.OvsDbInactivityTimeout,
×
573
                config.OvsDbConnectMaxRetry,
×
574
        ); err != nil {
×
575
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
576
        }
×
577
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
578
                config.OvnSbAddr,
×
579
                config.OvnTimeout,
×
580
                config.OvsDbConnectTimeout,
×
581
                config.OvsDbInactivityTimeout,
×
582
                config.OvsDbConnectMaxRetry,
×
583
        ); err != nil {
×
584
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
585
        }
×
586
        if config.EnableLb {
×
587
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
588
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
589
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
590
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
591
                        "DeleteSwitchLBRule",
×
592
                        workqueue.NewTypedMaxOfRateLimiter(
×
593
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
594
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
595
                        ),
×
596
                )
×
597
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
598
                        "UpdateSwitchLBRule",
×
599
                        workqueue.NewTypedMaxOfRateLimiter(
×
600
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
601
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
602
                        ),
×
603
                )
×
604

×
605
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
606
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
607
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
608
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
609
        }
×
610

611
        if config.EnableNP {
×
612
                controller.npsLister = npInformer.Lister()
×
613
                controller.npsSynced = npInformer.Informer().HasSynced
×
614
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
615
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
616
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
617
        }
×
618

619
        if config.EnableANP {
×
620
                controller.anpsLister = anpInformer.Lister()
×
621
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
622
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
623
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
624
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
625
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
626

×
627
                controller.banpsLister = banpInformer.Lister()
×
628
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
629
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
630
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
631
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
632
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
633
        }
×
634

635
        defer controller.shutdown()
×
636
        klog.Info("Starting OVN controller")
×
637

×
638
        // Wait for the caches to be synced before starting workers
×
639
        controller.informerFactory.Start(ctx.Done())
×
640
        controller.cmInformerFactory.Start(ctx.Done())
×
641
        controller.deployInformerFactory.Start(ctx.Done())
×
642
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
643
        controller.anpInformerFactory.Start(ctx.Done())
×
644
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
645

×
646
        klog.Info("Waiting for informer caches to sync")
×
647
        cacheSyncs := []cache.InformerSynced{
×
648
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
649
                controller.vpcSynced, controller.subnetSynced,
×
650
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
651
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
652
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
653
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
654
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
655
                controller.ovnDnatRuleSynced,
×
656
        }
×
657
        if controller.config.EnableLb {
×
658
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
659
        }
×
660
        if controller.config.EnableNP {
×
661
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
662
        }
×
663
        if controller.config.EnableANP {
×
664
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
665
        }
×
666

667
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
668
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
669
        }
×
670

671
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
672
                AddFunc:    controller.enqueueAddPod,
×
673
                DeleteFunc: controller.enqueueDeletePod,
×
674
                UpdateFunc: controller.enqueueUpdatePod,
×
675
        }); err != nil {
×
676
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
677
        }
×
678

679
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
680
                AddFunc:    controller.enqueueAddNamespace,
×
681
                UpdateFunc: controller.enqueueUpdateNamespace,
×
682
                DeleteFunc: controller.enqueueDeleteNamespace,
×
683
        }); err != nil {
×
684
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
685
        }
×
686

687
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
688
                AddFunc:    controller.enqueueAddNode,
×
689
                UpdateFunc: controller.enqueueUpdateNode,
×
690
                DeleteFunc: controller.enqueueDeleteNode,
×
691
        }); err != nil {
×
692
                util.LogFatalAndExit(err, "failed to add node event handler")
×
693
        }
×
694

695
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
696
                AddFunc:    controller.enqueueAddService,
×
697
                DeleteFunc: controller.enqueueDeleteService,
×
698
                UpdateFunc: controller.enqueueUpdateService,
×
699
        }); err != nil {
×
700
                util.LogFatalAndExit(err, "failed to add service event handler")
×
701
        }
×
702

703
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
704
                AddFunc:    controller.enqueueAddEndpointSlice,
×
705
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
706
        }); err != nil {
×
707
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
708
        }
×
709

710
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
711
                AddFunc:    controller.enqueueAddDeployment,
×
712
                UpdateFunc: controller.enqueueUpdateDeployment,
×
713
        }); err != nil {
×
714
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
715
        }
×
716

717
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
718
                AddFunc:    controller.enqueueAddVpc,
×
719
                UpdateFunc: controller.enqueueUpdateVpc,
×
720
                DeleteFunc: controller.enqueueDelVpc,
×
721
        }); err != nil {
×
722
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
723
        }
×
724

725
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
726
                AddFunc:    controller.enqueueAddVpcNatGw,
×
727
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
728
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
729
        }); err != nil {
×
730
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
731
        }
×
732

733
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
734
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
735
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
736
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
737
        }); err != nil {
×
738
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
739
        }
×
740

741
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
742
                AddFunc:    controller.enqueueAddSubnet,
×
743
                UpdateFunc: controller.enqueueUpdateSubnet,
×
744
                DeleteFunc: controller.enqueueDeleteSubnet,
×
745
        }); err != nil {
×
746
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
747
        }
×
748

749
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
750
                AddFunc:    controller.enqueueAddIPPool,
×
751
                UpdateFunc: controller.enqueueUpdateIPPool,
×
752
                DeleteFunc: controller.enqueueDeleteIPPool,
×
753
        }); err != nil {
×
754
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
755
        }
×
756

757
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
758
                AddFunc:    controller.enqueueAddIP,
×
759
                UpdateFunc: controller.enqueueUpdateIP,
×
760
                DeleteFunc: controller.enqueueDelIP,
×
761
        }); err != nil {
×
762
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
763
        }
×
764

765
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
766
                AddFunc:    controller.enqueueAddVlan,
×
767
                DeleteFunc: controller.enqueueDelVlan,
×
768
                UpdateFunc: controller.enqueueUpdateVlan,
×
769
        }); err != nil {
×
770
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
771
        }
×
772

773
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
774
                AddFunc:    controller.enqueueAddSg,
×
775
                DeleteFunc: controller.enqueueDeleteSg,
×
776
                UpdateFunc: controller.enqueueUpdateSg,
×
777
        }); err != nil {
×
778
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
779
        }
×
780

781
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
782
                AddFunc:    controller.enqueueAddVirtualIP,
×
783
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
784
                DeleteFunc: controller.enqueueDelVirtualIP,
×
785
        }); err != nil {
×
786
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
787
        }
×
788

789
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
790
                AddFunc:    controller.enqueueAddIptablesEip,
×
791
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
792
                DeleteFunc: controller.enqueueDelIptablesEip,
×
793
        }); err != nil {
×
794
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
795
        }
×
796

797
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
798
                AddFunc:    controller.enqueueAddIptablesFip,
×
799
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
800
                DeleteFunc: controller.enqueueDelIptablesFip,
×
801
        }); err != nil {
×
802
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
803
        }
×
804

805
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
806
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
807
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
808
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
809
        }); err != nil {
×
810
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
811
        }
×
812

813
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
814
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
815
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
816
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
817
        }); err != nil {
×
818
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
819
        }
×
820

821
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
822
                AddFunc:    controller.enqueueAddOvnEip,
×
823
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
824
                DeleteFunc: controller.enqueueDelOvnEip,
×
825
        }); err != nil {
×
826
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
827
        }
×
828

829
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
830
                AddFunc:    controller.enqueueAddOvnFip,
×
831
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
832
                DeleteFunc: controller.enqueueDelOvnFip,
×
833
        }); err != nil {
×
834
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
835
        }
×
836

837
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
838
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
839
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
840
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
841
        }); err != nil {
×
842
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
843
        }
×
844

845
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
846
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
847
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
848
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
849
        }); err != nil {
×
850
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
851
        }
×
852

853
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
854
                AddFunc:    controller.enqueueAddQoSPolicy,
×
855
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
856
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
857
        }); err != nil {
×
858
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
859
        }
×
860

861
        if config.EnableLb {
×
862
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
863
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
864
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
865
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
866
                }); err != nil {
×
867
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
868
                }
×
869

870
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
871
                        AddFunc:    controller.enqueueAddVpcDNS,
×
872
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
873
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
874
                }); err != nil {
×
875
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
876
                }
×
877
        }
878

879
        if config.EnableNP {
×
880
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                        AddFunc:    controller.enqueueAddNp,
×
882
                        UpdateFunc: controller.enqueueUpdateNp,
×
883
                        DeleteFunc: controller.enqueueDeleteNp,
×
884
                }); err != nil {
×
885
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
886
                }
×
887
        }
888

889
        if config.EnableANP {
×
890
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
891
                        AddFunc:    controller.enqueueAddAnp,
×
892
                        UpdateFunc: controller.enqueueUpdateAnp,
×
893
                        DeleteFunc: controller.enqueueDeleteAnp,
×
894
                }); err != nil {
×
895
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
896
                }
×
897

898
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
899
                        AddFunc:    controller.enqueueAddBanp,
×
900
                        UpdateFunc: controller.enqueueUpdateBanp,
×
901
                        DeleteFunc: controller.enqueueDeleteBanp,
×
902
                }); err != nil {
×
903
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
904
                }
×
905

906
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
907
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
908
        }
909

910
        if config.EnableOVNIPSec {
×
911
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
912
                        AddFunc:    controller.enqueueAddCsr,
×
913
                        UpdateFunc: controller.enqueueUpdateCsr,
×
914
                        // no need to add delete func for csr
×
915
                }); err != nil {
×
916
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
917
                }
×
918
        }
919

920
        controller.Run(ctx)
×
921
}
922

923
// Run will set up the event handlers for types we are interested in, as well
924
// as syncing informer caches and starting workers. It will block until stopCh
925
// is closed, at which point it will shutdown the workqueue and wait for
926
// workers to finish processing their current work items.
927
func (c *Controller) Run(ctx context.Context) {
×
928
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
929
        // Otherwise, the init process should be placed after all workers have already started working
×
930
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
931
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
932
        }
×
933

934
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
935
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
936
        }
×
937

938
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
939
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
940
        }
×
941

942
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
943
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
944
        }
×
945

946
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
947
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
948
        }
×
949

950
        if err := c.InitOVN(); err != nil {
×
951
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
952
        }
×
953

954
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
955
        if err := c.syncIPCR(); err != nil {
×
956
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
957
        }
×
958

959
        if err := c.syncFinalizers(); err != nil {
×
960
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
961
        }
×
962

963
        if err := c.InitIPAM(); err != nil {
×
964
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
965
        }
×
966

967
        if err := c.syncNodeRoutes(); err != nil {
×
968
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
969
        }
×
970

971
        if err := c.syncSubnetCR(); err != nil {
×
972
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
973
        }
×
974

975
        if err := c.syncVlanCR(); err != nil {
×
976
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
977
        }
×
978

979
        if c.config.EnableOVNIPSec {
×
980
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
981
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
982
                }
×
983
        }
984

985
        // start workers to do all the network operations
986
        c.startWorkers(ctx)
×
987

×
988
        c.initResourceOnce()
×
989
        <-ctx.Done()
×
990
        klog.Info("Shutting down workers")
×
991
}
992

993
func (c *Controller) dbStatus() {
×
994
        const maxFailures = 5
×
995

×
996
        done := make(chan error, 2)
×
997
        go func() {
×
998
                done <- c.OVNNbClient.Echo(context.Background())
×
999
        }()
×
1000
        go func() {
×
1001
                done <- c.OVNSbClient.Echo(context.Background())
×
1002
        }()
×
1003

1004
        resultsReceived := 0
×
1005
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1006

×
1007
        for resultsReceived < 2 {
×
1008
                select {
×
1009
                case err := <-done:
×
1010
                        resultsReceived++
×
1011
                        if err != nil {
×
1012
                                c.dbFailureCount++
×
1013
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1014
                                if c.dbFailureCount >= maxFailures {
×
1015
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1016
                                }
×
1017
                                return
×
1018
                        }
1019
                case <-timeout:
×
1020
                        c.dbFailureCount++
×
1021
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1022
                        if c.dbFailureCount >= maxFailures {
×
1023
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1024
                        }
×
1025
                        return
×
1026
                }
1027
        }
1028

1029
        if c.dbFailureCount > 0 {
×
1030
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1031
                c.dbFailureCount = 0
×
1032
        }
×
1033
}
1034

1035
func (c *Controller) shutdown() {
×
1036
        utilruntime.HandleCrash()
×
1037

×
1038
        c.addOrUpdatePodQueue.ShutDown()
×
1039
        c.deletePodQueue.ShutDown()
×
1040
        c.updatePodSecurityQueue.ShutDown()
×
1041

×
1042
        c.addNamespaceQueue.ShutDown()
×
1043

×
1044
        c.addOrUpdateSubnetQueue.ShutDown()
×
1045
        c.deleteSubnetQueue.ShutDown()
×
1046
        c.updateSubnetStatusQueue.ShutDown()
×
1047
        c.syncVirtualPortsQueue.ShutDown()
×
1048

×
1049
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1050
        c.updateIPPoolStatusQueue.ShutDown()
×
1051
        c.deleteIPPoolQueue.ShutDown()
×
1052

×
1053
        c.addNodeQueue.ShutDown()
×
1054
        c.updateNodeQueue.ShutDown()
×
1055
        c.deleteNodeQueue.ShutDown()
×
1056

×
1057
        c.addServiceQueue.ShutDown()
×
1058
        c.deleteServiceQueue.ShutDown()
×
1059
        c.updateServiceQueue.ShutDown()
×
1060
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1061

×
1062
        c.addVlanQueue.ShutDown()
×
1063
        c.delVlanQueue.ShutDown()
×
1064
        c.updateVlanQueue.ShutDown()
×
1065

×
1066
        c.addOrUpdateVpcQueue.ShutDown()
×
1067
        c.updateVpcStatusQueue.ShutDown()
×
1068
        c.delVpcQueue.ShutDown()
×
1069

×
1070
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1071
        c.initVpcNatGatewayQueue.ShutDown()
×
1072
        c.delVpcNatGatewayQueue.ShutDown()
×
1073
        c.updateVpcEipQueue.ShutDown()
×
1074
        c.updateVpcFloatingIPQueue.ShutDown()
×
1075
        c.updateVpcDnatQueue.ShutDown()
×
1076
        c.updateVpcSnatQueue.ShutDown()
×
1077
        c.updateVpcSubnetQueue.ShutDown()
×
1078

×
1079
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1080
        c.delVpcEgressGatewayQueue.ShutDown()
×
1081

×
1082
        if c.config.EnableLb {
×
1083
                c.addSwitchLBRuleQueue.ShutDown()
×
1084
                c.delSwitchLBRuleQueue.ShutDown()
×
1085
                c.updateSwitchLBRuleQueue.ShutDown()
×
1086

×
1087
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1088
                c.delVpcDNSQueue.ShutDown()
×
1089
        }
×
1090

1091
        c.addIPQueue.ShutDown()
×
1092
        c.updateIPQueue.ShutDown()
×
1093
        c.delIPQueue.ShutDown()
×
1094

×
1095
        c.addVirtualIPQueue.ShutDown()
×
1096
        c.updateVirtualIPQueue.ShutDown()
×
1097
        c.updateVirtualParentsQueue.ShutDown()
×
1098
        c.delVirtualIPQueue.ShutDown()
×
1099

×
1100
        c.addIptablesEipQueue.ShutDown()
×
1101
        c.updateIptablesEipQueue.ShutDown()
×
1102
        c.resetIptablesEipQueue.ShutDown()
×
1103
        c.delIptablesEipQueue.ShutDown()
×
1104

×
1105
        c.addIptablesFipQueue.ShutDown()
×
1106
        c.updateIptablesFipQueue.ShutDown()
×
1107
        c.delIptablesFipQueue.ShutDown()
×
1108

×
1109
        c.addIptablesDnatRuleQueue.ShutDown()
×
1110
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1111
        c.delIptablesDnatRuleQueue.ShutDown()
×
1112

×
1113
        c.addIptablesSnatRuleQueue.ShutDown()
×
1114
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1115
        c.delIptablesSnatRuleQueue.ShutDown()
×
1116

×
1117
        c.addQoSPolicyQueue.ShutDown()
×
1118
        c.updateQoSPolicyQueue.ShutDown()
×
1119
        c.delQoSPolicyQueue.ShutDown()
×
1120

×
1121
        c.addOvnEipQueue.ShutDown()
×
1122
        c.updateOvnEipQueue.ShutDown()
×
1123
        c.resetOvnEipQueue.ShutDown()
×
1124
        c.delOvnEipQueue.ShutDown()
×
1125

×
1126
        c.addOvnFipQueue.ShutDown()
×
1127
        c.updateOvnFipQueue.ShutDown()
×
1128
        c.delOvnFipQueue.ShutDown()
×
1129

×
1130
        c.addOvnSnatRuleQueue.ShutDown()
×
1131
        c.updateOvnSnatRuleQueue.ShutDown()
×
1132
        c.delOvnSnatRuleQueue.ShutDown()
×
1133

×
1134
        c.addOvnDnatRuleQueue.ShutDown()
×
1135
        c.updateOvnDnatRuleQueue.ShutDown()
×
1136
        c.delOvnDnatRuleQueue.ShutDown()
×
1137

×
1138
        if c.config.EnableNP {
×
1139
                c.updateNpQueue.ShutDown()
×
1140
                c.deleteNpQueue.ShutDown()
×
1141
        }
×
1142
        if c.config.EnableANP {
×
1143
                c.addAnpQueue.ShutDown()
×
1144
                c.updateAnpQueue.ShutDown()
×
1145
                c.deleteAnpQueue.ShutDown()
×
1146

×
1147
                c.addBanpQueue.ShutDown()
×
1148
                c.updateBanpQueue.ShutDown()
×
1149
                c.deleteBanpQueue.ShutDown()
×
1150
        }
×
1151

1152
        c.addOrUpdateSgQueue.ShutDown()
×
1153
        c.delSgQueue.ShutDown()
×
1154
        c.syncSgPortsQueue.ShutDown()
×
1155

×
1156
        c.addOrUpdateCsrQueue.ShutDown()
×
1157

×
1158
        if c.config.EnableLiveMigrationOptimize {
×
1159
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1160
        }
×
1161
}
1162

1163
func (c *Controller) startWorkers(ctx context.Context) {
×
1164
        klog.Info("Starting workers")
×
1165

×
1166
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1167
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1168
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1169

×
1170
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1171
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1172
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1173
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1174
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1175
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1176
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1177
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1178
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1179
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1180
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1181
        // add default and join subnet and wait them ready
×
1182
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1183
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1184
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1185
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1186
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1187
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1188
                klog.Infof("wait for subnets %v ready", subnets)
×
1189

×
1190
                return c.allSubnetReady(subnets...)
×
1191
        })
×
1192
        if err != nil {
×
1193
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1194
        }
×
1195

1196
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1197
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1198
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1199

×
1200
        // run node worker before handle any pods
×
1201
        for range c.config.WorkerNum {
×
1202
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1203
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1204
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1205
        }
×
1206
        for {
×
1207
                ready := true
×
1208
                time.Sleep(3 * time.Second)
×
1209
                nodes, err := c.nodesLister.List(labels.Everything())
×
1210
                if err != nil {
×
1211
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1212
                }
×
1213
                for _, node := range nodes {
×
1214
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1215
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1216
                                ready = false
×
1217
                                break
×
1218
                        }
1219
                }
1220
                if ready {
×
1221
                        break
×
1222
                }
1223
        }
1224

1225
        if c.config.EnableLb {
×
1226
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1227
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1228
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1229

×
1230
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1231
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1232
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1233

×
1234
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1235
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1236
                go wait.Until(func() {
×
1237
                        c.resyncVpcDNSConfig()
×
1238
                }, 5*time.Second, ctx.Done())
×
1239
        }
1240

1241
        for range c.config.WorkerNum {
×
1242
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1243
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1244
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1245

×
1246
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1247
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1248
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1249
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1250
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1251

×
1252
                if c.config.EnableLb {
×
1253
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1254
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1255
                }
×
1256

1257
                if c.config.EnableNP {
×
1258
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1259
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1260
                }
×
1261

1262
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1263
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1264
        }
1265

1266
        if c.config.EnableEipSnat {
×
1267
                go wait.Until(func() {
×
1268
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1269
                        c.resyncExternalGateway()
×
1270
                }, time.Second, ctx.Done())
×
1271

1272
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1273
                c.OVNNbClient.MonitorBFD()
×
1274
        }
1275
        // TODO: we should merge these two vpc nat config into one config and resync them together
1276
        go wait.Until(func() {
×
1277
                c.resyncVpcNatGwConfig()
×
1278
        }, time.Second, ctx.Done())
×
1279

1280
        go wait.Until(func() {
×
1281
                c.resyncVpcNatConfig()
×
1282
        }, time.Second, ctx.Done())
×
1283

1284
        if c.config.GCInterval != 0 {
×
1285
                go wait.Until(func() {
×
1286
                        if err := c.markAndCleanLSP(); err != nil {
×
1287
                                klog.Errorf("gc lsp error: %v", err)
×
1288
                        }
×
1289
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1290
        }
1291

1292
        go wait.Until(func() {
×
1293
                if err := c.inspectPod(); err != nil {
×
1294
                        klog.Errorf("inspection error: %v", err)
×
1295
                }
×
1296
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1297

1298
        if c.config.EnableExternalVpc {
×
1299
                go wait.Until(func() {
×
1300
                        c.syncExternalVpc()
×
1301
                }, 5*time.Second, ctx.Done())
×
1302
        }
1303

1304
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1305
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1306
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1307

×
1308
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1309
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1310
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1311
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1312

×
1313
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1314
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1315
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1316

×
1317
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1318
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1319
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1320

×
1321
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1322
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1323
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1324

×
1325
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1326

×
1327
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1330

×
1331
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1332
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1333
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1334
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1335

×
1336
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1337
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1338
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1339
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1340

×
1341
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1342
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1343
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1344

×
1345
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1346
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1347
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1348

×
1349
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1350
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1351
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1352

×
1353
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1354
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1355
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1356

×
1357
        if c.config.EnableANP {
×
1358
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1359
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1360
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1361

×
1362
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1363
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1364
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1365
        }
×
1366

1367
        if c.config.EnableLiveMigrationOptimize {
×
1368
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1369
        }
×
1370

1371
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1372

×
1373
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1374
}
1375

1376
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1377
        for _, lsName := range subnets {
2✔
1378
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1379
                if err != nil {
1✔
1380
                        klog.Error(err)
×
1381
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1382
                }
×
1383

1384
                if !exist {
2✔
1385
                        return false, nil
1✔
1386
                }
1✔
1387
        }
1388

1389
        return true, nil
1✔
1390
}
1391

1392
func (c *Controller) initResourceOnce() {
×
1393
        c.registerSubnetMetrics()
×
1394

×
1395
        if err := c.initNodeChassis(); err != nil {
×
1396
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1397
        }
×
1398

1399
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1400
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1401
        }
×
1402
        if err := c.syncSecurityGroup(); err != nil {
×
1403
                util.LogFatalAndExit(err, "failed to sync security group")
×
1404
        }
×
1405

1406
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1407
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1408
        }
×
1409

1410
        if err := c.initVpcNatGw(); err != nil {
×
1411
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1412
        }
×
1413
        if c.config.EnableLb {
×
1414
                if err := c.initVpcDNSConfig(); err != nil {
×
1415
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1416
                }
×
1417
        }
1418

1419
        // remove resources in ovndb that not exist any more in kubernetes resources
1420
        // process gc at last in case of affecting other init process
1421
        if err := c.gc(); err != nil {
×
1422
                util.LogFatalAndExit(err, "failed to run gc")
×
1423
        }
×
1424
}
1425

1426
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1427
        item, shutdown := queue.Get()
×
1428
        if shutdown {
×
1429
                return false
×
1430
        }
×
1431

1432
        err := func(item T) error {
×
1433
                defer queue.Done(item)
×
1434
                if err := handler(item); err != nil {
×
1435
                        queue.AddRateLimited(item)
×
1436
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1437
                }
×
1438
                queue.Forget(item)
×
1439
                return nil
×
1440
        }(item)
1441
        if err != nil {
×
1442
                utilruntime.HandleError(err)
×
1443
                return true
×
1444
        }
×
1445
        return true
×
1446
}
1447

1448
func getWorkItemKey(obj any) string {
×
1449
        switch v := obj.(type) {
×
1450
        case string:
×
1451
                return v
×
1452
        case *vpcService:
×
1453
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1454
        case *AdminNetworkPolicyChangedDelta:
×
1455
                return v.key
×
1456
        case *SlrInfo:
×
1457
                return v.Name
×
1458
        default:
×
1459
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1460
                if err != nil {
×
1461
                        utilruntime.HandleError(err)
×
1462
                        return ""
×
1463
                }
×
1464
                return key
×
1465
        }
1466
}
1467

1468
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1469
        return func() {
×
1470
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1471
                }
×
1472
        }
1473
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc