• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 19880282979

03 Dec 2025 02:29AM UTC coverage: 22.273% (-0.01%) from 22.284%
19880282979

push

github

oilbeater
perf: remove sleep

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

11554 of 51874 relevant lines covered (22.27%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.19
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync/atomic"
9
        "time"
10

11
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
12
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
13
        "github.com/puzpuzpuz/xsync/v4"
14
        "golang.org/x/time/rate"
15
        corev1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/labels"
18
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
19
        "k8s.io/apimachinery/pkg/util/wait"
20
        kubeinformers "k8s.io/client-go/informers"
21
        "k8s.io/client-go/kubernetes/scheme"
22
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
23
        appsv1 "k8s.io/client-go/listers/apps/v1"
24
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
25
        v1 "k8s.io/client-go/listers/core/v1"
26
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
27
        netv1 "k8s.io/client-go/listers/networking/v1"
28
        "k8s.io/client-go/tools/cache"
29
        "k8s.io/client-go/tools/record"
30
        "k8s.io/client-go/util/workqueue"
31
        "k8s.io/klog/v2"
32
        "k8s.io/utils/keymutex"
33
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
34
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
35
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
36

37
        "github.com/kubeovn/kube-ovn/pkg/informer"
38

39
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
40
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
41
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
42
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
43
        "github.com/kubeovn/kube-ovn/pkg/ovs"
44
        "github.com/kubeovn/kube-ovn/pkg/util"
45
)
46

47
const controllerAgentName = "kube-ovn-controller"
48

49
const (
50
        logicalSwitchKey              = "ls"
51
        logicalRouterKey              = "lr"
52
        portGroupKey                  = "pg"
53
        networkPolicyKey              = "np"
54
        sgKey                         = "sg"
55
        associatedSgKeyPrefix         = "associated_sg_"
56
        sgsKey                        = "security_groups"
57
        u2oKey                        = "u2o"
58
        adminNetworkPolicyKey         = "anp"
59
        baselineAdminNetworkPolicyKey = "banp"
60
        ippoolKey                     = "ippool"
61
)
62

63
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
64
type Controller struct {
65
        config *Configuration
66

67
        ipam           *ovnipam.IPAM
68
        namedPort      *NamedPort
69
        anpPrioNameMap map[int32]string
70
        anpNamePrioMap map[string]int32
71

72
        OVNNbClient ovs.NbClient
73
        OVNSbClient ovs.SbClient
74

75
        // ExternalGatewayType define external gateway type, centralized
76
        ExternalGatewayType string
77

78
        podsLister             v1.PodLister
79
        podsSynced             cache.InformerSynced
80
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
81
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
82
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
83
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
84
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
85
        podKeyMutex            keymutex.KeyMutex
86

87
        vpcsLister           kubeovnlister.VpcLister
88
        vpcSynced            cache.InformerSynced
89
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
90
        vpcLastPoliciesMap   *xsync.Map[string, string]
91
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
92
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
93
        vpcKeyMutex          keymutex.KeyMutex
94

95
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
96
        vpcNatGatewaySynced           cache.InformerSynced
97
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
98
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
99
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
100
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
101
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
102
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
103
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
104
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
105
        vpcNatGwKeyMutex              keymutex.KeyMutex
106
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
107

108
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
109
        vpcEgressGatewaySynced           cache.InformerSynced
110
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
111
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
112
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
113

114
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
115
        switchLBRuleSynced      cache.InformerSynced
116
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
117
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
118
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
119

120
        vpcDNSLister           kubeovnlister.VpcDnsLister
121
        vpcDNSSynced           cache.InformerSynced
122
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
123
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
124

125
        subnetsLister           kubeovnlister.SubnetLister
126
        subnetSynced            cache.InformerSynced
127
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
128
        subnetLastVpcNameMap    *xsync.Map[string, string]
129
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
130
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
131
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
132
        subnetKeyMutex          keymutex.KeyMutex
133

134
        ippoolLister            kubeovnlister.IPPoolLister
135
        ippoolSynced            cache.InformerSynced
136
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
137
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
138
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
139
        ippoolKeyMutex          keymutex.KeyMutex
140

141
        ipsLister     kubeovnlister.IPLister
142
        ipSynced      cache.InformerSynced
143
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
144
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
145
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
146

147
        virtualIpsLister          kubeovnlister.VipLister
148
        virtualIpsSynced          cache.InformerSynced
149
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
150
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
151
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
152
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
153

154
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
155
        iptablesEipSynced      cache.InformerSynced
156
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
157
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
158
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
159
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
160

161
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
162
        iptablesFipSynced      cache.InformerSynced
163
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
164
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
165
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
166

167
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
168
        iptablesDnatRuleSynced      cache.InformerSynced
169
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
170
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
171
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
172

173
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
174
        iptablesSnatRuleSynced      cache.InformerSynced
175
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
176
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
177
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
178

179
        ovnEipsLister     kubeovnlister.OvnEipLister
180
        ovnEipSynced      cache.InformerSynced
181
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
182
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
183
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
184
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
185

186
        ovnFipsLister     kubeovnlister.OvnFipLister
187
        ovnFipSynced      cache.InformerSynced
188
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
189
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
190
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
191

192
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
193
        ovnSnatRuleSynced      cache.InformerSynced
194
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
195
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
196
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
197

198
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
199
        ovnDnatRuleSynced      cache.InformerSynced
200
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
201
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
202
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
203

204
        providerNetworksLister kubeovnlister.ProviderNetworkLister
205
        providerNetworkSynced  cache.InformerSynced
206

207
        vlansLister     kubeovnlister.VlanLister
208
        vlanSynced      cache.InformerSynced
209
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
210
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
211
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
212
        vlanKeyMutex    keymutex.KeyMutex
213

214
        namespacesLister  v1.NamespaceLister
215
        namespacesSynced  cache.InformerSynced
216
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
217
        nsKeyMutex        keymutex.KeyMutex
218

219
        nodesLister     v1.NodeLister
220
        nodesSynced     cache.InformerSynced
221
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
222
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
223
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
224
        nodeKeyMutex    keymutex.KeyMutex
225

226
        servicesLister     v1.ServiceLister
227
        serviceSynced      cache.InformerSynced
228
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
229
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
230
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
231
        svcKeyMutex        keymutex.KeyMutex
232

233
        endpointSlicesLister          discoveryv1.EndpointSliceLister
234
        endpointSlicesSynced          cache.InformerSynced
235
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
236
        epKeyMutex                    keymutex.KeyMutex
237

238
        deploymentsLister appsv1.DeploymentLister
239
        deploymentsSynced cache.InformerSynced
240

241
        npsLister     netv1.NetworkPolicyLister
242
        npsSynced     cache.InformerSynced
243
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
244
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
245
        npKeyMutex    keymutex.KeyMutex
246

247
        sgsLister          kubeovnlister.SecurityGroupLister
248
        sgSynced           cache.InformerSynced
249
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
250
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
251
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
252
        sgKeyMutex         keymutex.KeyMutex
253

254
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
255
        qosPolicySynced      cache.InformerSynced
256
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
257
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
258
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
259

260
        configMapsLister v1.ConfigMapLister
261
        configMapsSynced cache.InformerSynced
262

263
        anpsLister     anplister.AdminNetworkPolicyLister
264
        anpsSynced     cache.InformerSynced
265
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
266
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
267
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
268
        anpKeyMutex    keymutex.KeyMutex
269

270
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
271
        dnsNameResolversSynced          cache.InformerSynced
272
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
273
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
274

275
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
276
        banpsSynced     cache.InformerSynced
277
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
278
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
279
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
280
        banpKeyMutex    keymutex.KeyMutex
281

282
        csrLister           certListerv1.CertificateSigningRequestLister
283
        csrSynced           cache.InformerSynced
284
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
285

286
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
287
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
288
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
289

290
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
291
        netAttachSynced          cache.InformerSynced
292
        netAttachInformerFactory netAttach.SharedInformerFactory
293

294
        recorder               record.EventRecorder
295
        informerFactory        kubeinformers.SharedInformerFactory
296
        cmInformerFactory      kubeinformers.SharedInformerFactory
297
        deployInformerFactory  kubeinformers.SharedInformerFactory
298
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
299
        anpInformerFactory     anpinformer.SharedInformerFactory
300

301
        // Database health check
302
        dbFailureCount int
303

304
        distributedSubnetNeedSync atomic.Bool
305
}
306

307
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
308
        if rateLimiter == nil {
2✔
309
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
310
        }
1✔
311
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
312
}
313

314
// Run creates and runs a new ovn controller
315
func Run(ctx context.Context, config *Configuration) {
×
316
        klog.V(4).Info("Creating event broadcaster")
×
317
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
318
        eventBroadcaster.StartLogging(klog.Infof)
×
319
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
320
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
321
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
322
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
323
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
324
        )
×
325

×
326
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
327
        if err != nil {
×
328
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
329
        }
×
330

331
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
332
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
333
                        listOption.AllowWatchBookmarks = true
×
334
                }))
×
335
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
336
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
337
                        listOption.AllowWatchBookmarks = true
×
338
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
339
        // deployment informer used to list/watch vpc egress gateway workloads
340
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
341
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
342
                        listOption.AllowWatchBookmarks = true
×
343
                        listOption.LabelSelector = selector.String()
×
344
                }))
×
345
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
346
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
347
                        listOption.AllowWatchBookmarks = true
×
348
                }))
×
349
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
350
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
351
                        listOption.AllowWatchBookmarks = true
×
352
                }))
×
353

354
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
355
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
356
                        listOption.AllowWatchBookmarks = true
×
357
                }),
×
358
        )
359

360
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
361

×
362
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
363
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
364
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
365
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
366
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
367
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
368
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
369
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
370
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
371
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
372
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
373
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
374
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
375
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
376
        podInformer := informerFactory.Core().V1().Pods()
×
377
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
378
        nodeInformer := informerFactory.Core().V1().Nodes()
×
379
        serviceInformer := informerFactory.Core().V1().Services()
×
380
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
381
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
382
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
383
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
384
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
385
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
386
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
387
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
388
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
389
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
390
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
391
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
392
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
393
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
394
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
395
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
396

×
397
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
398
        controller := &Controller{
×
399
                config:             config,
×
400
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
401
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
402
                ipam:               ovnipam.NewIPAM(),
×
403
                namedPort:          NewNamedPort(),
×
404

×
405
                vpcsLister:           vpcInformer.Lister(),
×
406
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
407
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
408
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
409
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
410
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
411
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
412

×
413
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
414
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
415
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
416
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
417
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
418
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
419
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
420
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
421
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
422
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
423
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
424
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
425
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
426
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
427
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
428
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
429
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
430

×
431
                subnetsLister:           subnetInformer.Lister(),
×
432
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
433
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
434
                subnetLastVpcNameMap:    xsync.NewMap[string, string](),
×
435
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
436
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
437
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
438
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
439

×
440
                ippoolLister:            ippoolInformer.Lister(),
×
441
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
442
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
443
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
444
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
445
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
446

×
447
                ipsLister:     ipInformer.Lister(),
×
448
                ipSynced:      ipInformer.Informer().HasSynced,
×
449
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
450
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
451
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
452

×
453
                virtualIpsLister:          virtualIPInformer.Lister(),
×
454
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
455
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
456
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
457
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
458
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
459

×
460
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
461
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
462
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
463
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
464
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
465
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
466

×
467
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
468
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
469
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
470
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
471
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
472

×
473
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
474
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
475
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
476
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
477
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
478

×
479
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
480
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
481
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
482
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
483
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
484

×
485
                vlansLister:     vlanInformer.Lister(),
×
486
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
487
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
488
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
489
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
490
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
491

×
492
                providerNetworksLister: providerNetworkInformer.Lister(),
×
493
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
494

×
495
                podsLister:          podInformer.Lister(),
×
496
                podsSynced:          podInformer.Informer().HasSynced,
×
497
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
498
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
499
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
500
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
501
                                Name:          "DeletePod",
×
502
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
503
                        },
×
504
                ),
×
505
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
506
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
507

×
508
                namespacesLister:  namespaceInformer.Lister(),
×
509
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
510
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
511
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
512

×
513
                nodesLister:     nodeInformer.Lister(),
×
514
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
515
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
516
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
517
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
518
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
519

×
520
                servicesLister:     serviceInformer.Lister(),
×
521
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
522
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
523
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
524
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
525
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
526

×
527
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
528
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
529
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
530
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
531

×
532
                deploymentsLister: deploymentInformer.Lister(),
×
533
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
534

×
535
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
536
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
537
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
538
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
539
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
540

×
541
                configMapsLister: configMapInformer.Lister(),
×
542
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
543

×
544
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
545
                sgsLister:          sgInformer.Lister(),
×
546
                sgSynced:           sgInformer.Informer().HasSynced,
×
547
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
548
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
549
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
550

×
551
                ovnEipsLister:     ovnEipInformer.Lister(),
×
552
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
553
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
554
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
555
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
556
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
557

×
558
                ovnFipsLister:     ovnFipInformer.Lister(),
×
559
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
560
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
561
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
562
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
563

×
564
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
565
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
566
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
567
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
568
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
569

×
570
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
571
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
572
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
573
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
574
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
575

×
576
                csrLister:           csrInformer.Lister(),
×
577
                csrSynced:           csrInformer.Informer().HasSynced,
×
578
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
579

×
580
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
581
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
582
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
583

×
584
                netAttachLister:          netAttachInformer.Lister(),
×
585
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
586
                netAttachInformerFactory: attachNetInformerFactory,
×
587

×
588
                recorder:               recorder,
×
589
                informerFactory:        informerFactory,
×
590
                cmInformerFactory:      cmInformerFactory,
×
591
                deployInformerFactory:  deployInformerFactory,
×
592
                kubeovnInformerFactory: kubeovnInformerFactory,
×
593
                anpInformerFactory:     anpInformerFactory,
×
594
        }
×
595

×
596
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
597
                config.OvnNbAddr,
×
598
                config.OvnTimeout,
×
599
                config.OvsDbConnectTimeout,
×
600
                config.OvsDbInactivityTimeout,
×
601
                config.OvsDbConnectMaxRetry,
×
602
        ); err != nil {
×
603
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
604
        }
×
605
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
606
                config.OvnSbAddr,
×
607
                config.OvnTimeout,
×
608
                config.OvsDbConnectTimeout,
×
609
                config.OvsDbInactivityTimeout,
×
610
                config.OvsDbConnectMaxRetry,
×
611
        ); err != nil {
×
612
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
613
        }
×
614
        if config.EnableLb {
×
615
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
616
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
617
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
618
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
619
                        "DeleteSwitchLBRule",
×
620
                        workqueue.NewTypedMaxOfRateLimiter(
×
621
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
622
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
623
                        ),
×
624
                )
×
625
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
626
                        "UpdateSwitchLBRule",
×
627
                        workqueue.NewTypedMaxOfRateLimiter(
×
628
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
629
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
630
                        ),
×
631
                )
×
632

×
633
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
634
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
635
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
636
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
637
        }
×
638

639
        if config.EnableNP {
×
640
                controller.npsLister = npInformer.Lister()
×
641
                controller.npsSynced = npInformer.Informer().HasSynced
×
642
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
643
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
644
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
645
        }
×
646

647
        if config.EnableANP {
×
648
                controller.anpsLister = anpInformer.Lister()
×
649
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
650
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
651
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
652
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
653
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
654

×
655
                controller.banpsLister = banpInformer.Lister()
×
656
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
657
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
658
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
659
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
660
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
661
        }
×
662

663
        if config.EnableDNSNameResolver {
×
664
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
665
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
666
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
667
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
668
        }
×
669

670
        defer controller.shutdown()
×
671
        klog.Info("Starting OVN controller")
×
672

×
673
        // Wait for the caches to be synced before starting workers
×
674
        controller.informerFactory.Start(ctx.Done())
×
675
        controller.cmInformerFactory.Start(ctx.Done())
×
676
        controller.deployInformerFactory.Start(ctx.Done())
×
677
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
678
        controller.anpInformerFactory.Start(ctx.Done())
×
679
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
680
        controller.StartNetAttachInformerFactory(ctx)
×
681

×
682
        klog.Info("Waiting for informer caches to sync")
×
683
        cacheSyncs := []cache.InformerSynced{
×
684
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
685
                controller.vpcSynced, controller.subnetSynced,
×
686
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
687
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
688
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
689
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
690
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
691
                controller.ovnDnatRuleSynced,
×
692
        }
×
693
        if controller.config.EnableLb {
×
694
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
695
        }
×
696
        if controller.config.EnableNP {
×
697
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
698
        }
×
699
        if controller.config.EnableANP {
×
700
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
701
        }
×
702
        if controller.config.EnableDNSNameResolver {
×
703
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
704
        }
×
705

706
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
707
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
708
        }
×
709

710
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
711
                AddFunc:    controller.enqueueAddPod,
×
712
                DeleteFunc: controller.enqueueDeletePod,
×
713
                UpdateFunc: controller.enqueueUpdatePod,
×
714
        }); err != nil {
×
715
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
716
        }
×
717

718
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
719
                AddFunc:    controller.enqueueAddNamespace,
×
720
                UpdateFunc: controller.enqueueUpdateNamespace,
×
721
                DeleteFunc: controller.enqueueDeleteNamespace,
×
722
        }); err != nil {
×
723
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
724
        }
×
725

726
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
727
                AddFunc:    controller.enqueueAddNode,
×
728
                UpdateFunc: controller.enqueueUpdateNode,
×
729
                DeleteFunc: controller.enqueueDeleteNode,
×
730
        }); err != nil {
×
731
                util.LogFatalAndExit(err, "failed to add node event handler")
×
732
        }
×
733

734
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
735
                AddFunc:    controller.enqueueAddService,
×
736
                DeleteFunc: controller.enqueueDeleteService,
×
737
                UpdateFunc: controller.enqueueUpdateService,
×
738
        }); err != nil {
×
739
                util.LogFatalAndExit(err, "failed to add service event handler")
×
740
        }
×
741

742
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
743
                AddFunc:    controller.enqueueAddEndpointSlice,
×
744
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
745
        }); err != nil {
×
746
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
747
        }
×
748

749
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
750
                AddFunc:    controller.enqueueAddDeployment,
×
751
                UpdateFunc: controller.enqueueUpdateDeployment,
×
752
        }); err != nil {
×
753
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
754
        }
×
755

756
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
757
                AddFunc:    controller.enqueueAddVpc,
×
758
                UpdateFunc: controller.enqueueUpdateVpc,
×
759
                DeleteFunc: controller.enqueueDelVpc,
×
760
        }); err != nil {
×
761
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
762
        }
×
763

764
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
765
                AddFunc:    controller.enqueueAddVpcNatGw,
×
766
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
767
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
768
        }); err != nil {
×
769
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
770
        }
×
771

772
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
773
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
774
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
775
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
776
        }); err != nil {
×
777
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
778
        }
×
779

780
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
781
                AddFunc:    controller.enqueueAddSubnet,
×
782
                UpdateFunc: controller.enqueueUpdateSubnet,
×
783
                DeleteFunc: controller.enqueueDeleteSubnet,
×
784
        }); err != nil {
×
785
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
786
        }
×
787

788
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
789
                AddFunc:    controller.enqueueAddIPPool,
×
790
                UpdateFunc: controller.enqueueUpdateIPPool,
×
791
                DeleteFunc: controller.enqueueDeleteIPPool,
×
792
        }); err != nil {
×
793
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
794
        }
×
795

796
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
797
                AddFunc:    controller.enqueueAddIP,
×
798
                UpdateFunc: controller.enqueueUpdateIP,
×
799
                DeleteFunc: controller.enqueueDelIP,
×
800
        }); err != nil {
×
801
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
802
        }
×
803

804
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
805
                AddFunc:    controller.enqueueAddVlan,
×
806
                DeleteFunc: controller.enqueueDelVlan,
×
807
                UpdateFunc: controller.enqueueUpdateVlan,
×
808
        }); err != nil {
×
809
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
810
        }
×
811

812
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
813
                AddFunc:    controller.enqueueAddSg,
×
814
                DeleteFunc: controller.enqueueDeleteSg,
×
815
                UpdateFunc: controller.enqueueUpdateSg,
×
816
        }); err != nil {
×
817
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
818
        }
×
819

820
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
821
                AddFunc:    controller.enqueueAddVirtualIP,
×
822
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
823
                DeleteFunc: controller.enqueueDelVirtualIP,
×
824
        }); err != nil {
×
825
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
826
        }
×
827

828
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
829
                AddFunc:    controller.enqueueAddIptablesEip,
×
830
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
831
                DeleteFunc: controller.enqueueDelIptablesEip,
×
832
        }); err != nil {
×
833
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
834
        }
×
835

836
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
837
                AddFunc:    controller.enqueueAddIptablesFip,
×
838
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
839
                DeleteFunc: controller.enqueueDelIptablesFip,
×
840
        }); err != nil {
×
841
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
842
        }
×
843

844
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
845
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
846
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
847
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
848
        }); err != nil {
×
849
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
850
        }
×
851

852
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
853
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
854
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
855
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
856
        }); err != nil {
×
857
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
858
        }
×
859

860
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
861
                AddFunc:    controller.enqueueAddOvnEip,
×
862
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
863
                DeleteFunc: controller.enqueueDelOvnEip,
×
864
        }); err != nil {
×
865
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
866
        }
×
867

868
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
869
                AddFunc:    controller.enqueueAddOvnFip,
×
870
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
871
                DeleteFunc: controller.enqueueDelOvnFip,
×
872
        }); err != nil {
×
873
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
874
        }
×
875

876
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
877
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
878
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
879
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
880
        }); err != nil {
×
881
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
882
        }
×
883

884
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
885
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
886
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
887
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
888
        }); err != nil {
×
889
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
890
        }
×
891

892
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
893
                AddFunc:    controller.enqueueAddQoSPolicy,
×
894
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
895
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
896
        }); err != nil {
×
897
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
898
        }
×
899

900
        if config.EnableLb {
×
901
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
902
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
903
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
904
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
905
                }); err != nil {
×
906
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
907
                }
×
908

909
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
910
                        AddFunc:    controller.enqueueAddVpcDNS,
×
911
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
912
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
913
                }); err != nil {
×
914
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
915
                }
×
916
        }
917

918
        if config.EnableNP {
×
919
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
920
                        AddFunc:    controller.enqueueAddNp,
×
921
                        UpdateFunc: controller.enqueueUpdateNp,
×
922
                        DeleteFunc: controller.enqueueDeleteNp,
×
923
                }); err != nil {
×
924
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
925
                }
×
926
        }
927

928
        if config.EnableANP {
×
929
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
930
                        AddFunc:    controller.enqueueAddAnp,
×
931
                        UpdateFunc: controller.enqueueUpdateAnp,
×
932
                        DeleteFunc: controller.enqueueDeleteAnp,
×
933
                }); err != nil {
×
934
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
935
                }
×
936

937
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
938
                        AddFunc:    controller.enqueueAddBanp,
×
939
                        UpdateFunc: controller.enqueueUpdateBanp,
×
940
                        DeleteFunc: controller.enqueueDeleteBanp,
×
941
                }); err != nil {
×
942
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
943
                }
×
944

945
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
946
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
947
        }
948

949
        if config.EnableDNSNameResolver {
×
950
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
951
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
952
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
953
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
954
                }); err != nil {
×
955
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
956
                }
×
957
        }
958

959
        if config.EnableOVNIPSec {
×
960
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
961
                        AddFunc:    controller.enqueueAddCsr,
×
962
                        UpdateFunc: controller.enqueueUpdateCsr,
×
963
                        // no need to add delete func for csr
×
964
                }); err != nil {
×
965
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
966
                }
×
967
        }
968

969
        controller.Run(ctx)
×
970
}
971

972
// Run will set up the event handlers for types we are interested in, as well
973
// as syncing informer caches and starting workers. It will block until stopCh
974
// is closed, at which point it will shutdown the workqueue and wait for
975
// workers to finish processing their current work items.
976
func (c *Controller) Run(ctx context.Context) {
×
977
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
978
        // Otherwise, the init process should be placed after all workers have already started working
×
979
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
980
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
981
        }
×
982

983
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
984
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
985
        }
×
986

987
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
988
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
989
        }
×
990

991
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
992
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
993
        }
×
994

995
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
996
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
997
        }
×
998

999
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1000
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1001
        }
×
1002

1003
        if err := c.InitOVN(); err != nil {
×
1004
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1005
        }
×
1006

1007
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1008
        if err := c.syncIPCR(); err != nil {
×
1009
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1010
        }
×
1011

1012
        if err := c.syncFinalizers(); err != nil {
×
1013
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1014
        }
×
1015

1016
        if err := c.InitIPAM(); err != nil {
×
1017
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1018
        }
×
1019

1020
        if err := c.syncNodeRoutes(); err != nil {
×
1021
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1022
        }
×
1023

1024
        if err := c.syncSubnetCR(); err != nil {
×
1025
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1026
        }
×
1027

1028
        if err := c.syncVlanCR(); err != nil {
×
1029
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1030
        }
×
1031

1032
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1033
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1034
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1035
                }
×
1036
        }
1037

1038
        // start workers to do all the network operations
1039
        c.startWorkers(ctx)
×
1040

×
1041
        c.initResourceOnce()
×
1042
        <-ctx.Done()
×
1043
        klog.Info("Shutting down workers")
×
1044
}
1045

1046
func (c *Controller) dbStatus() {
×
1047
        const maxFailures = 5
×
1048

×
1049
        done := make(chan error, 2)
×
1050
        go func() {
×
1051
                done <- c.OVNNbClient.Echo(context.Background())
×
1052
        }()
×
1053
        go func() {
×
1054
                done <- c.OVNSbClient.Echo(context.Background())
×
1055
        }()
×
1056

1057
        resultsReceived := 0
×
1058
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1059

×
1060
        for resultsReceived < 2 {
×
1061
                select {
×
1062
                case err := <-done:
×
1063
                        resultsReceived++
×
1064
                        if err != nil {
×
1065
                                c.dbFailureCount++
×
1066
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1067
                                if c.dbFailureCount >= maxFailures {
×
1068
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1069
                                }
×
1070
                                return
×
1071
                        }
1072
                case <-timeout:
×
1073
                        c.dbFailureCount++
×
1074
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1075
                        if c.dbFailureCount >= maxFailures {
×
1076
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1077
                        }
×
1078
                        return
×
1079
                }
1080
        }
1081

1082
        if c.dbFailureCount > 0 {
×
1083
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1084
                c.dbFailureCount = 0
×
1085
        }
×
1086
}
1087

1088
func (c *Controller) shutdown() {
×
1089
        utilruntime.HandleCrash()
×
1090

×
1091
        c.addOrUpdatePodQueue.ShutDown()
×
1092
        c.deletePodQueue.ShutDown()
×
1093
        c.updatePodSecurityQueue.ShutDown()
×
1094

×
1095
        c.addNamespaceQueue.ShutDown()
×
1096

×
1097
        c.addOrUpdateSubnetQueue.ShutDown()
×
1098
        c.deleteSubnetQueue.ShutDown()
×
1099
        c.updateSubnetStatusQueue.ShutDown()
×
1100
        c.syncVirtualPortsQueue.ShutDown()
×
1101

×
1102
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1103
        c.updateIPPoolStatusQueue.ShutDown()
×
1104
        c.deleteIPPoolQueue.ShutDown()
×
1105

×
1106
        c.addNodeQueue.ShutDown()
×
1107
        c.updateNodeQueue.ShutDown()
×
1108
        c.deleteNodeQueue.ShutDown()
×
1109

×
1110
        c.addServiceQueue.ShutDown()
×
1111
        c.deleteServiceQueue.ShutDown()
×
1112
        c.updateServiceQueue.ShutDown()
×
1113
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1114

×
1115
        c.addVlanQueue.ShutDown()
×
1116
        c.delVlanQueue.ShutDown()
×
1117
        c.updateVlanQueue.ShutDown()
×
1118

×
1119
        c.addOrUpdateVpcQueue.ShutDown()
×
1120
        c.updateVpcStatusQueue.ShutDown()
×
1121
        c.delVpcQueue.ShutDown()
×
1122

×
1123
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1124
        c.initVpcNatGatewayQueue.ShutDown()
×
1125
        c.delVpcNatGatewayQueue.ShutDown()
×
1126
        c.updateVpcEipQueue.ShutDown()
×
1127
        c.updateVpcFloatingIPQueue.ShutDown()
×
1128
        c.updateVpcDnatQueue.ShutDown()
×
1129
        c.updateVpcSnatQueue.ShutDown()
×
1130
        c.updateVpcSubnetQueue.ShutDown()
×
1131

×
1132
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1133
        c.delVpcEgressGatewayQueue.ShutDown()
×
1134

×
1135
        if c.config.EnableLb {
×
1136
                c.addSwitchLBRuleQueue.ShutDown()
×
1137
                c.delSwitchLBRuleQueue.ShutDown()
×
1138
                c.updateSwitchLBRuleQueue.ShutDown()
×
1139

×
1140
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1141
                c.delVpcDNSQueue.ShutDown()
×
1142
        }
×
1143

1144
        c.addIPQueue.ShutDown()
×
1145
        c.updateIPQueue.ShutDown()
×
1146
        c.delIPQueue.ShutDown()
×
1147

×
1148
        c.addVirtualIPQueue.ShutDown()
×
1149
        c.updateVirtualIPQueue.ShutDown()
×
1150
        c.updateVirtualParentsQueue.ShutDown()
×
1151
        c.delVirtualIPQueue.ShutDown()
×
1152

×
1153
        c.addIptablesEipQueue.ShutDown()
×
1154
        c.updateIptablesEipQueue.ShutDown()
×
1155
        c.resetIptablesEipQueue.ShutDown()
×
1156
        c.delIptablesEipQueue.ShutDown()
×
1157

×
1158
        c.addIptablesFipQueue.ShutDown()
×
1159
        c.updateIptablesFipQueue.ShutDown()
×
1160
        c.delIptablesFipQueue.ShutDown()
×
1161

×
1162
        c.addIptablesDnatRuleQueue.ShutDown()
×
1163
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1164
        c.delIptablesDnatRuleQueue.ShutDown()
×
1165

×
1166
        c.addIptablesSnatRuleQueue.ShutDown()
×
1167
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1168
        c.delIptablesSnatRuleQueue.ShutDown()
×
1169

×
1170
        c.addQoSPolicyQueue.ShutDown()
×
1171
        c.updateQoSPolicyQueue.ShutDown()
×
1172
        c.delQoSPolicyQueue.ShutDown()
×
1173

×
1174
        c.addOvnEipQueue.ShutDown()
×
1175
        c.updateOvnEipQueue.ShutDown()
×
1176
        c.resetOvnEipQueue.ShutDown()
×
1177
        c.delOvnEipQueue.ShutDown()
×
1178

×
1179
        c.addOvnFipQueue.ShutDown()
×
1180
        c.updateOvnFipQueue.ShutDown()
×
1181
        c.delOvnFipQueue.ShutDown()
×
1182

×
1183
        c.addOvnSnatRuleQueue.ShutDown()
×
1184
        c.updateOvnSnatRuleQueue.ShutDown()
×
1185
        c.delOvnSnatRuleQueue.ShutDown()
×
1186

×
1187
        c.addOvnDnatRuleQueue.ShutDown()
×
1188
        c.updateOvnDnatRuleQueue.ShutDown()
×
1189
        c.delOvnDnatRuleQueue.ShutDown()
×
1190

×
1191
        if c.config.EnableNP {
×
1192
                c.updateNpQueue.ShutDown()
×
1193
                c.deleteNpQueue.ShutDown()
×
1194
        }
×
1195
        if c.config.EnableANP {
×
1196
                c.addAnpQueue.ShutDown()
×
1197
                c.updateAnpQueue.ShutDown()
×
1198
                c.deleteAnpQueue.ShutDown()
×
1199

×
1200
                c.addBanpQueue.ShutDown()
×
1201
                c.updateBanpQueue.ShutDown()
×
1202
                c.deleteBanpQueue.ShutDown()
×
1203
        }
×
1204

1205
        if c.config.EnableDNSNameResolver {
×
1206
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1207
                c.deleteDNSNameResolverQueue.ShutDown()
×
1208
        }
×
1209

1210
        c.addOrUpdateSgQueue.ShutDown()
×
1211
        c.delSgQueue.ShutDown()
×
1212
        c.syncSgPortsQueue.ShutDown()
×
1213

×
1214
        c.addOrUpdateCsrQueue.ShutDown()
×
1215

×
1216
        if c.config.EnableLiveMigrationOptimize {
×
1217
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1218
        }
×
1219
}
1220

1221
func (c *Controller) startWorkers(ctx context.Context) {
×
1222
        klog.Info("Starting workers")
×
1223

×
1224
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1225
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1226
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1227

×
1228
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1229
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1230
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1231
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1232
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1233
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1234
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1235
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1236
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1237
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1238
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1239
        // add default and join subnet and wait them ready
×
1240
        for range c.config.WorkerNum {
×
1241
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1242
        }
×
1243
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1244
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1245
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1246
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1247
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1248
                klog.Infof("wait for subnets %v ready", subnets)
×
1249

×
1250
                return c.allSubnetReady(subnets...)
×
1251
        })
×
1252
        if err != nil {
×
1253
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1254
        }
×
1255

1256
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1257
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1258
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1259

×
1260
        // run node worker before handle any pods
×
1261
        for range c.config.WorkerNum {
×
1262
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1263
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1264
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1265
        }
×
1266
        for {
×
1267
                ready := true
×
1268
                time.Sleep(3 * time.Second)
×
1269
                nodes, err := c.nodesLister.List(labels.Everything())
×
1270
                if err != nil {
×
1271
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1272
                }
×
1273
                for _, node := range nodes {
×
1274
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1275
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1276
                                ready = false
×
1277
                                break
×
1278
                        }
1279
                }
1280
                if ready {
×
1281
                        break
×
1282
                }
1283
        }
1284

1285
        if c.config.EnableLb {
×
1286
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1287
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1288
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1289

×
1290
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1291
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1292
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1293

×
1294
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1295
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1296
                go wait.Until(func() {
×
1297
                        c.resyncVpcDNSConfig()
×
1298
                }, 5*time.Second, ctx.Done())
×
1299
        }
1300

1301
        for range c.config.WorkerNum {
×
1302
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1303
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1304
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1305

×
1306
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1307
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1308
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1309
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1310
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1311

×
1312
                if c.config.EnableLb {
×
1313
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1314
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1315
                }
×
1316

1317
                if c.config.EnableNP {
×
1318
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1319
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1320
                }
×
1321

1322
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1323
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1324
        }
1325

1326
        if c.config.EnableEipSnat {
×
1327
                go wait.Until(func() {
×
1328
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1329
                        c.resyncExternalGateway()
×
1330
                }, time.Second, ctx.Done())
×
1331

1332
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1333
                c.OVNNbClient.MonitorBFD()
×
1334
        }
1335
        // TODO: we should merge these two vpc nat config into one config and resync them together
1336
        go wait.Until(func() {
×
1337
                c.resyncVpcNatGwConfig()
×
1338
        }, time.Second, ctx.Done())
×
1339

1340
        go wait.Until(func() {
×
1341
                c.resyncVpcNatConfig()
×
1342
        }, time.Second, ctx.Done())
×
1343

1344
        if c.config.GCInterval != 0 {
×
1345
                go wait.Until(func() {
×
1346
                        if err := c.markAndCleanLSP(); err != nil {
×
1347
                                klog.Errorf("gc lsp error: %v", err)
×
1348
                        }
×
1349
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1350
        }
1351

1352
        go wait.Until(func() {
×
1353
                if err := c.inspectPod(); err != nil {
×
1354
                        klog.Errorf("inspection error: %v", err)
×
1355
                }
×
1356
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1357

1358
        if c.config.EnableExternalVpc {
×
1359
                go wait.Until(func() {
×
1360
                        c.syncExternalVpc()
×
1361
                }, 5*time.Second, ctx.Done())
×
1362
        }
1363

1364
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1365
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1366
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1367
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1368

×
1369
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1370
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1371
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1372
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1373

×
1374
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1375
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1376
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1377

×
1378
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1379
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1380
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1381

×
1382
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1383
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1384
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1385

×
1386
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1387

×
1388
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1389
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1390
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1391

×
1392
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1393
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1394
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1395
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1396

×
1397
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1398
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1399
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1400
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1401

×
1402
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1403
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1404
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1405

×
1406
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1407
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1408
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1409

×
1410
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1411
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1412
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1413

×
1414
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1415
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1416
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1417

×
1418
        if c.config.EnableANP {
×
1419
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1420
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1421
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1422

×
1423
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1424
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1425
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1426
        }
×
1427

1428
        if c.config.EnableDNSNameResolver {
×
1429
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1430
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1431
        }
×
1432

1433
        if c.config.EnableLiveMigrationOptimize {
×
1434
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1435
        }
×
1436

1437
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1438

×
1439
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1440
}
1441

1442
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1443
        for _, lsName := range subnets {
2✔
1444
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1445
                if err != nil {
1✔
1446
                        klog.Error(err)
×
1447
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1448
                }
×
1449

1450
                if !exist {
2✔
1451
                        return false, nil
1✔
1452
                }
1✔
1453
        }
1454

1455
        return true, nil
1✔
1456
}
1457

1458
func (c *Controller) initResourceOnce() {
×
1459
        c.registerSubnetMetrics()
×
1460

×
1461
        if err := c.initNodeChassis(); err != nil {
×
1462
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1463
        }
×
1464

1465
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1466
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1467
        }
×
1468
        if err := c.syncSecurityGroup(); err != nil {
×
1469
                util.LogFatalAndExit(err, "failed to sync security group")
×
1470
        }
×
1471

1472
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1473
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1474
        }
×
1475

1476
        if err := c.initVpcNatGw(); err != nil {
×
1477
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1478
        }
×
1479
        if c.config.EnableLb {
×
1480
                if err := c.initVpcDNSConfig(); err != nil {
×
1481
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1482
                }
×
1483
        }
1484

1485
        // remove resources in ovndb that not exist any more in kubernetes resources
1486
        // process gc at last in case of affecting other init process
1487
        if err := c.gc(); err != nil {
×
1488
                util.LogFatalAndExit(err, "failed to run gc")
×
1489
        }
×
1490
}
1491

1492
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1493
        item, shutdown := queue.Get()
×
1494
        if shutdown {
×
1495
                return false
×
1496
        }
×
1497

1498
        err := func(item T) error {
×
1499
                defer queue.Done(item)
×
1500
                if err := handler(item); err != nil {
×
1501
                        queue.AddRateLimited(item)
×
1502
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1503
                }
×
1504
                queue.Forget(item)
×
1505
                return nil
×
1506
        }(item)
1507
        if err != nil {
×
1508
                utilruntime.HandleError(err)
×
1509
                return true
×
1510
        }
×
1511
        return true
×
1512
}
1513

1514
func getWorkItemKey(obj any) string {
×
1515
        switch v := obj.(type) {
×
1516
        case string:
×
1517
                return v
×
1518
        case *vpcService:
×
1519
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1520
        case *AdminNetworkPolicyChangedDelta:
×
1521
                return v.key
×
1522
        case *SlrInfo:
×
1523
                return v.Name
×
1524
        default:
×
1525
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1526
                if err != nil {
×
1527
                        utilruntime.HandleError(err)
×
1528
                        return ""
×
1529
                }
×
1530
                return key
×
1531
        }
1532
}
1533

1534
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1535
        return func() {
×
1536
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1537
                }
×
1538
        }
1539
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc