• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 17723172333

15 Sep 2025 05:35AM UTC coverage: 21.04% (-0.2%) from 21.229%
17723172333

push

github

web-flow
fix(deps): update module github.com/puzpuzpuz/xsync/v4 to v4.2.0 (#5722)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

10661 of 50669 relevant lines covered (21.04%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.2
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
11
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
12
        "github.com/puzpuzpuz/xsync/v4"
13
        "golang.org/x/time/rate"
14
        corev1 "k8s.io/api/core/v1"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
18
        "k8s.io/apimachinery/pkg/util/wait"
19
        kubeinformers "k8s.io/client-go/informers"
20
        "k8s.io/client-go/kubernetes/scheme"
21
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
22
        appsv1 "k8s.io/client-go/listers/apps/v1"
23
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
24
        v1 "k8s.io/client-go/listers/core/v1"
25
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
26
        netv1 "k8s.io/client-go/listers/networking/v1"
27
        "k8s.io/client-go/tools/cache"
28
        "k8s.io/client-go/tools/record"
29
        "k8s.io/client-go/util/workqueue"
30
        "k8s.io/klog/v2"
31
        "k8s.io/utils/keymutex"
32
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
33
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
34
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
35

36
        "github.com/kubeovn/kube-ovn/pkg/informer"
37

38
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
39
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
40
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
41
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
42
        "github.com/kubeovn/kube-ovn/pkg/ovs"
43
        "github.com/kubeovn/kube-ovn/pkg/util"
44
)
45

46
const controllerAgentName = "kube-ovn-controller"
47

48
const (
49
        logicalSwitchKey              = "ls"
50
        logicalRouterKey              = "lr"
51
        portGroupKey                  = "pg"
52
        networkPolicyKey              = "np"
53
        sgKey                         = "sg"
54
        associatedSgKeyPrefix         = "associated_sg_"
55
        sgsKey                        = "security_groups"
56
        u2oKey                        = "u2o"
57
        adminNetworkPolicyKey         = "anp"
58
        baselineAdminNetworkPolicyKey = "banp"
59
)
60

61
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
62
type Controller struct {
63
        config *Configuration
64

65
        ipam           *ovnipam.IPAM
66
        namedPort      *NamedPort
67
        anpPrioNameMap map[int32]string
68
        anpNamePrioMap map[string]int32
69

70
        OVNNbClient ovs.NbClient
71
        OVNSbClient ovs.SbClient
72

73
        // ExternalGatewayType define external gateway type, centralized
74
        ExternalGatewayType string
75

76
        podsLister             v1.PodLister
77
        podsSynced             cache.InformerSynced
78
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
79
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
80
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
81
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
82
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
83
        podKeyMutex            keymutex.KeyMutex
84

85
        vpcsLister           kubeovnlister.VpcLister
86
        vpcSynced            cache.InformerSynced
87
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
88
        vpcLastPoliciesMap   *xsync.Map[string, string]
89
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
90
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
91
        vpcKeyMutex          keymutex.KeyMutex
92

93
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
94
        vpcNatGatewaySynced           cache.InformerSynced
95
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
96
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
97
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
98
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
99
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
100
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
101
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
102
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
103
        vpcNatGwKeyMutex              keymutex.KeyMutex
104

105
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
106
        vpcEgressGatewaySynced           cache.InformerSynced
107
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
108
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
109
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
110

111
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
112
        switchLBRuleSynced      cache.InformerSynced
113
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
114
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
115
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
116

117
        vpcDNSLister           kubeovnlister.VpcDnsLister
118
        vpcDNSSynced           cache.InformerSynced
119
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
120
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
121

122
        subnetsLister           kubeovnlister.SubnetLister
123
        subnetSynced            cache.InformerSynced
124
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
125
        subnetLastVpcNameMap    *xsync.Map[string, string]
126
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
127
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
128
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
129
        subnetKeyMutex          keymutex.KeyMutex
130

131
        ippoolLister            kubeovnlister.IPPoolLister
132
        ippoolSynced            cache.InformerSynced
133
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
134
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
135
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
136
        ippoolKeyMutex          keymutex.KeyMutex
137

138
        ipsLister     kubeovnlister.IPLister
139
        ipSynced      cache.InformerSynced
140
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
141
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
142
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
143

144
        virtualIpsLister          kubeovnlister.VipLister
145
        virtualIpsSynced          cache.InformerSynced
146
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
147
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
148
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
149
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
150

151
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
152
        iptablesEipSynced      cache.InformerSynced
153
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
154
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
155
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
156
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
157

158
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
159
        iptablesFipSynced      cache.InformerSynced
160
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
161
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
162
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
163

164
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
165
        iptablesDnatRuleSynced      cache.InformerSynced
166
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
167
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
168
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
169

170
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
171
        iptablesSnatRuleSynced      cache.InformerSynced
172
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
173
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
174
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
175

176
        ovnEipsLister     kubeovnlister.OvnEipLister
177
        ovnEipSynced      cache.InformerSynced
178
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
179
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
180
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
181
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
182

183
        ovnFipsLister     kubeovnlister.OvnFipLister
184
        ovnFipSynced      cache.InformerSynced
185
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
186
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
187
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
188

189
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
190
        ovnSnatRuleSynced      cache.InformerSynced
191
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
193
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194

195
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
196
        ovnDnatRuleSynced      cache.InformerSynced
197
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
198
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
199
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
200

201
        providerNetworksLister kubeovnlister.ProviderNetworkLister
202
        providerNetworkSynced  cache.InformerSynced
203

204
        vlansLister     kubeovnlister.VlanLister
205
        vlanSynced      cache.InformerSynced
206
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
207
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
208
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
209
        vlanKeyMutex    keymutex.KeyMutex
210

211
        namespacesLister  v1.NamespaceLister
212
        namespacesSynced  cache.InformerSynced
213
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
214
        nsKeyMutex        keymutex.KeyMutex
215

216
        nodesLister     v1.NodeLister
217
        nodesSynced     cache.InformerSynced
218
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
219
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
220
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
221
        nodeKeyMutex    keymutex.KeyMutex
222

223
        servicesLister     v1.ServiceLister
224
        serviceSynced      cache.InformerSynced
225
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
226
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
227
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
228
        svcKeyMutex        keymutex.KeyMutex
229

230
        endpointSlicesLister          discoveryv1.EndpointSliceLister
231
        endpointSlicesSynced          cache.InformerSynced
232
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
233
        epKeyMutex                    keymutex.KeyMutex
234

235
        deploymentsLister appsv1.DeploymentLister
236
        deploymentsSynced cache.InformerSynced
237

238
        npsLister     netv1.NetworkPolicyLister
239
        npsSynced     cache.InformerSynced
240
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
241
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
242
        npKeyMutex    keymutex.KeyMutex
243

244
        sgsLister          kubeovnlister.SecurityGroupLister
245
        sgSynced           cache.InformerSynced
246
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
247
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
248
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
249
        sgKeyMutex         keymutex.KeyMutex
250

251
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
252
        qosPolicySynced      cache.InformerSynced
253
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
254
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
255
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
256

257
        configMapsLister v1.ConfigMapLister
258
        configMapsSynced cache.InformerSynced
259

260
        anpsLister     anplister.AdminNetworkPolicyLister
261
        anpsSynced     cache.InformerSynced
262
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
263
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
264
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
265
        anpKeyMutex    keymutex.KeyMutex
266

267
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
268
        dnsNameResolversSynced          cache.InformerSynced
269
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
270
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
271

272
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
273
        banpsSynced     cache.InformerSynced
274
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
275
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
276
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
277
        banpKeyMutex    keymutex.KeyMutex
278

279
        csrLister           certListerv1.CertificateSigningRequestLister
280
        csrSynced           cache.InformerSynced
281
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
282

283
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
284
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
285
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
286

287
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
288
        netAttachSynced          cache.InformerSynced
289
        netAttachInformerFactory netAttach.SharedInformerFactory
290

291
        recorder               record.EventRecorder
292
        informerFactory        kubeinformers.SharedInformerFactory
293
        cmInformerFactory      kubeinformers.SharedInformerFactory
294
        deployInformerFactory  kubeinformers.SharedInformerFactory
295
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
296
        anpInformerFactory     anpinformer.SharedInformerFactory
297

298
        // Database health check
299
        dbFailureCount int
300
}
301

302
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
303
        if rateLimiter == nil {
2✔
304
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
305
        }
1✔
306
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
307
}
308

309
// Run creates and runs a new ovn controller
310
func Run(ctx context.Context, config *Configuration) {
×
311
        klog.V(4).Info("Creating event broadcaster")
×
312
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
313
        eventBroadcaster.StartLogging(klog.Infof)
×
314
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
315
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
316
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
317
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
318
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
319
        )
×
320

×
321
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
322
        if err != nil {
×
323
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
324
        }
×
325

326
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
327
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
328
                        listOption.AllowWatchBookmarks = true
×
329
                }))
×
330
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
331
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
332
                        listOption.AllowWatchBookmarks = true
×
333
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
334
        // deployment informer used to list/watch vpc egress gateway workloads
335
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
336
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
337
                        listOption.AllowWatchBookmarks = true
×
338
                        listOption.LabelSelector = selector.String()
×
339
                }))
×
340
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
341
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
342
                        listOption.AllowWatchBookmarks = true
×
343
                }))
×
344
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
345
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
346
                        listOption.AllowWatchBookmarks = true
×
347
                }))
×
348

349
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
350
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
351
                        listOption.AllowWatchBookmarks = true
×
352
                }),
×
353
        )
354

355
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
356

×
357
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
358
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
359
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
360
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
361
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
362
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
363
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
364
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
365
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
366
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
367
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
368
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
369
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
370
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
371
        podInformer := informerFactory.Core().V1().Pods()
×
372
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
373
        nodeInformer := informerFactory.Core().V1().Nodes()
×
374
        serviceInformer := informerFactory.Core().V1().Services()
×
375
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
376
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
377
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
378
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
379
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
380
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
381
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
382
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
383
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
384
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
385
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
386
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
387
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
388
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
389
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
390
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
391

×
392
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
393
        controller := &Controller{
×
394
                config:             config,
×
395
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
396
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
397
                ipam:               ovnipam.NewIPAM(),
×
398
                namedPort:          NewNamedPort(),
×
399

×
400
                vpcsLister:           vpcInformer.Lister(),
×
401
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
402
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
403
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
404
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
405
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
406
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
407

×
408
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
409
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
410
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
411
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
412
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
413
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
414
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
415
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
416
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
417
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
418
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
419

×
420
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
421
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
422
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
423
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
424
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
425

×
426
                subnetsLister:           subnetInformer.Lister(),
×
427
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
428
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
429
                subnetLastVpcNameMap:    xsync.NewMap[string, string](),
×
430
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
431
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
432
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
433
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
434

×
435
                ippoolLister:            ippoolInformer.Lister(),
×
436
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
437
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
438
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
439
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
440
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
441

×
442
                ipsLister:     ipInformer.Lister(),
×
443
                ipSynced:      ipInformer.Informer().HasSynced,
×
444
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
445
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
446
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
447

×
448
                virtualIpsLister:          virtualIPInformer.Lister(),
×
449
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
450
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
451
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
452
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
453
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
454

×
455
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
456
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
457
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
458
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
459
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
460
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
461

×
462
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
463
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
464
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
465
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
466
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
467

×
468
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
469
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
470
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
471
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
472
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
473

×
474
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
475
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
476
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
477
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
478
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
479

×
480
                vlansLister:     vlanInformer.Lister(),
×
481
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
482
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
483
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
484
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
485
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
486

×
487
                providerNetworksLister: providerNetworkInformer.Lister(),
×
488
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
489

×
490
                podsLister:          podInformer.Lister(),
×
491
                podsSynced:          podInformer.Informer().HasSynced,
×
492
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
493
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
494
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
495
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
496
                                Name:          "DeletePod",
×
497
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
498
                        },
×
499
                ),
×
500
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
501
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
502

×
503
                namespacesLister:  namespaceInformer.Lister(),
×
504
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
505
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
506
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
507

×
508
                nodesLister:     nodeInformer.Lister(),
×
509
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
510
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
511
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
512
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
513
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
514

×
515
                servicesLister:     serviceInformer.Lister(),
×
516
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
517
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
518
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
519
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
520
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
521

×
522
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
523
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
524
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
525
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
526

×
527
                deploymentsLister: deploymentInformer.Lister(),
×
528
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
529

×
530
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
531
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
532
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
533
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
534
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
535

×
536
                configMapsLister: configMapInformer.Lister(),
×
537
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
538

×
539
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
540
                sgsLister:          sgInformer.Lister(),
×
541
                sgSynced:           sgInformer.Informer().HasSynced,
×
542
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
543
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
544
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
545

×
546
                ovnEipsLister:     ovnEipInformer.Lister(),
×
547
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
548
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
549
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
550
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
551
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
552

×
553
                ovnFipsLister:     ovnFipInformer.Lister(),
×
554
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
555
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
556
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
557
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
558

×
559
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
560
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
561
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
562
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
563
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
564

×
565
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
566
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
567
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
568
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
569
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
570

×
571
                csrLister:           csrInformer.Lister(),
×
572
                csrSynced:           csrInformer.Informer().HasSynced,
×
573
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
574

×
575
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
576
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
577
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
578

×
579
                netAttachLister:          netAttachInformer.Lister(),
×
580
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
581
                netAttachInformerFactory: attachNetInformerFactory,
×
582

×
583
                recorder:               recorder,
×
584
                informerFactory:        informerFactory,
×
585
                cmInformerFactory:      cmInformerFactory,
×
586
                deployInformerFactory:  deployInformerFactory,
×
587
                kubeovnInformerFactory: kubeovnInformerFactory,
×
588
                anpInformerFactory:     anpInformerFactory,
×
589
        }
×
590

×
591
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
592
                config.OvnNbAddr,
×
593
                config.OvnTimeout,
×
594
                config.OvsDbConnectTimeout,
×
595
                config.OvsDbInactivityTimeout,
×
596
                config.OvsDbConnectMaxRetry,
×
597
        ); err != nil {
×
598
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
599
        }
×
600
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
601
                config.OvnSbAddr,
×
602
                config.OvnTimeout,
×
603
                config.OvsDbConnectTimeout,
×
604
                config.OvsDbInactivityTimeout,
×
605
                config.OvsDbConnectMaxRetry,
×
606
        ); err != nil {
×
607
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
608
        }
×
609
        if config.EnableLb {
×
610
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
611
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
612
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
613
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
614
                        "DeleteSwitchLBRule",
×
615
                        workqueue.NewTypedMaxOfRateLimiter(
×
616
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
617
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
618
                        ),
×
619
                )
×
620
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
621
                        "UpdateSwitchLBRule",
×
622
                        workqueue.NewTypedMaxOfRateLimiter(
×
623
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
624
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
625
                        ),
×
626
                )
×
627

×
628
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
629
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
630
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
631
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
632
        }
×
633

634
        if config.EnableNP {
×
635
                controller.npsLister = npInformer.Lister()
×
636
                controller.npsSynced = npInformer.Informer().HasSynced
×
637
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
638
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
639
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
640
        }
×
641

642
        if config.EnableANP {
×
643
                controller.anpsLister = anpInformer.Lister()
×
644
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
645
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
646
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
647
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
648
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
649

×
650
                controller.banpsLister = banpInformer.Lister()
×
651
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
652
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
653
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
654
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
655
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
656
        }
×
657

658
        if config.EnableDNSNameResolver {
×
659
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
660
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
661
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
662
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
663
        }
×
664

665
        defer controller.shutdown()
×
666
        klog.Info("Starting OVN controller")
×
667

×
668
        // Wait for the caches to be synced before starting workers
×
669
        controller.informerFactory.Start(ctx.Done())
×
670
        controller.cmInformerFactory.Start(ctx.Done())
×
671
        controller.deployInformerFactory.Start(ctx.Done())
×
672
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
673
        controller.anpInformerFactory.Start(ctx.Done())
×
674
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
675
        controller.StartNetAttachInformerFactory(ctx)
×
676

×
677
        klog.Info("Waiting for informer caches to sync")
×
678
        cacheSyncs := []cache.InformerSynced{
×
679
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
680
                controller.vpcSynced, controller.subnetSynced,
×
681
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
682
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
683
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
684
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
685
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
686
                controller.ovnDnatRuleSynced,
×
687
        }
×
688
        if controller.config.EnableLb {
×
689
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
690
        }
×
691
        if controller.config.EnableNP {
×
692
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
693
        }
×
694
        if controller.config.EnableANP {
×
695
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
696
        }
×
697
        if controller.config.EnableDNSNameResolver {
×
698
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
699
        }
×
700

701
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
702
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
703
        }
×
704

705
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
706
                AddFunc:    controller.enqueueAddPod,
×
707
                DeleteFunc: controller.enqueueDeletePod,
×
708
                UpdateFunc: controller.enqueueUpdatePod,
×
709
        }); err != nil {
×
710
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
711
        }
×
712

713
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
714
                AddFunc:    controller.enqueueAddNamespace,
×
715
                UpdateFunc: controller.enqueueUpdateNamespace,
×
716
                DeleteFunc: controller.enqueueDeleteNamespace,
×
717
        }); err != nil {
×
718
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
719
        }
×
720

721
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
722
                AddFunc:    controller.enqueueAddNode,
×
723
                UpdateFunc: controller.enqueueUpdateNode,
×
724
                DeleteFunc: controller.enqueueDeleteNode,
×
725
        }); err != nil {
×
726
                util.LogFatalAndExit(err, "failed to add node event handler")
×
727
        }
×
728

729
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
730
                AddFunc:    controller.enqueueAddService,
×
731
                DeleteFunc: controller.enqueueDeleteService,
×
732
                UpdateFunc: controller.enqueueUpdateService,
×
733
        }); err != nil {
×
734
                util.LogFatalAndExit(err, "failed to add service event handler")
×
735
        }
×
736

737
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
738
                AddFunc:    controller.enqueueAddEndpointSlice,
×
739
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
740
        }); err != nil {
×
741
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
742
        }
×
743

744
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
745
                AddFunc:    controller.enqueueAddDeployment,
×
746
                UpdateFunc: controller.enqueueUpdateDeployment,
×
747
        }); err != nil {
×
748
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
749
        }
×
750

751
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
752
                AddFunc:    controller.enqueueAddVpc,
×
753
                UpdateFunc: controller.enqueueUpdateVpc,
×
754
                DeleteFunc: controller.enqueueDelVpc,
×
755
        }); err != nil {
×
756
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
757
        }
×
758

759
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
760
                AddFunc:    controller.enqueueAddVpcNatGw,
×
761
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
762
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
763
        }); err != nil {
×
764
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
765
        }
×
766

767
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
768
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
769
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
770
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
771
        }); err != nil {
×
772
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
773
        }
×
774

775
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
776
                AddFunc:    controller.enqueueAddSubnet,
×
777
                UpdateFunc: controller.enqueueUpdateSubnet,
×
778
                DeleteFunc: controller.enqueueDeleteSubnet,
×
779
        }); err != nil {
×
780
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
781
        }
×
782

783
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
784
                AddFunc:    controller.enqueueAddIPPool,
×
785
                UpdateFunc: controller.enqueueUpdateIPPool,
×
786
                DeleteFunc: controller.enqueueDeleteIPPool,
×
787
        }); err != nil {
×
788
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
789
        }
×
790

791
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
792
                AddFunc:    controller.enqueueAddIP,
×
793
                UpdateFunc: controller.enqueueUpdateIP,
×
794
                DeleteFunc: controller.enqueueDelIP,
×
795
        }); err != nil {
×
796
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
797
        }
×
798

799
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
800
                AddFunc:    controller.enqueueAddVlan,
×
801
                DeleteFunc: controller.enqueueDelVlan,
×
802
                UpdateFunc: controller.enqueueUpdateVlan,
×
803
        }); err != nil {
×
804
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
805
        }
×
806

807
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
808
                AddFunc:    controller.enqueueAddSg,
×
809
                DeleteFunc: controller.enqueueDeleteSg,
×
810
                UpdateFunc: controller.enqueueUpdateSg,
×
811
        }); err != nil {
×
812
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
813
        }
×
814

815
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
816
                AddFunc:    controller.enqueueAddVirtualIP,
×
817
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
818
                DeleteFunc: controller.enqueueDelVirtualIP,
×
819
        }); err != nil {
×
820
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
821
        }
×
822

823
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
824
                AddFunc:    controller.enqueueAddIptablesEip,
×
825
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
826
                DeleteFunc: controller.enqueueDelIptablesEip,
×
827
        }); err != nil {
×
828
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
829
        }
×
830

831
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
832
                AddFunc:    controller.enqueueAddIptablesFip,
×
833
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
834
                DeleteFunc: controller.enqueueDelIptablesFip,
×
835
        }); err != nil {
×
836
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
837
        }
×
838

839
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
840
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
841
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
842
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
843
        }); err != nil {
×
844
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
845
        }
×
846

847
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
848
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
849
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
850
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
851
        }); err != nil {
×
852
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
853
        }
×
854

855
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
856
                AddFunc:    controller.enqueueAddOvnEip,
×
857
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
858
                DeleteFunc: controller.enqueueDelOvnEip,
×
859
        }); err != nil {
×
860
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
861
        }
×
862

863
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
864
                AddFunc:    controller.enqueueAddOvnFip,
×
865
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
866
                DeleteFunc: controller.enqueueDelOvnFip,
×
867
        }); err != nil {
×
868
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
869
        }
×
870

871
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
872
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
873
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
874
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
875
        }); err != nil {
×
876
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
877
        }
×
878

879
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
880
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
881
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
882
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
883
        }); err != nil {
×
884
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
885
        }
×
886

887
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
888
                AddFunc:    controller.enqueueAddQoSPolicy,
×
889
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
890
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
891
        }); err != nil {
×
892
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
893
        }
×
894

895
        if config.EnableLb {
×
896
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
897
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
898
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
899
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
900
                }); err != nil {
×
901
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
902
                }
×
903

904
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                        AddFunc:    controller.enqueueAddVpcDNS,
×
906
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
907
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
908
                }); err != nil {
×
909
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
910
                }
×
911
        }
912

913
        if config.EnableNP {
×
914
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
915
                        AddFunc:    controller.enqueueAddNp,
×
916
                        UpdateFunc: controller.enqueueUpdateNp,
×
917
                        DeleteFunc: controller.enqueueDeleteNp,
×
918
                }); err != nil {
×
919
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
920
                }
×
921
        }
922

923
        if config.EnableANP {
×
924
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
925
                        AddFunc:    controller.enqueueAddAnp,
×
926
                        UpdateFunc: controller.enqueueUpdateAnp,
×
927
                        DeleteFunc: controller.enqueueDeleteAnp,
×
928
                }); err != nil {
×
929
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
930
                }
×
931

932
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
933
                        AddFunc:    controller.enqueueAddBanp,
×
934
                        UpdateFunc: controller.enqueueUpdateBanp,
×
935
                        DeleteFunc: controller.enqueueDeleteBanp,
×
936
                }); err != nil {
×
937
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
938
                }
×
939

940
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
941
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
942
        }
943

944
        if config.EnableDNSNameResolver {
×
945
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
946
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
947
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
948
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
949
                }); err != nil {
×
950
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
951
                }
×
952
        }
953

954
        if config.EnableOVNIPSec {
×
955
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
956
                        AddFunc:    controller.enqueueAddCsr,
×
957
                        UpdateFunc: controller.enqueueUpdateCsr,
×
958
                        // no need to add delete func for csr
×
959
                }); err != nil {
×
960
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
961
                }
×
962
        }
963

964
        controller.Run(ctx)
×
965
}
966

967
// Run will set up the event handlers for types we are interested in, as well
968
// as syncing informer caches and starting workers. It will block until stopCh
969
// is closed, at which point it will shutdown the workqueue and wait for
970
// workers to finish processing their current work items.
971
func (c *Controller) Run(ctx context.Context) {
×
972
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
973
        // Otherwise, the init process should be placed after all workers have already started working
×
974
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
975
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
976
        }
×
977

978
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
979
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
980
        }
×
981

982
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
983
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
984
        }
×
985

986
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
987
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
988
        }
×
989

990
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
991
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
992
        }
×
993

994
        if err := c.InitOVN(); err != nil {
×
995
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
996
        }
×
997

998
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
999
        if err := c.syncIPCR(); err != nil {
×
1000
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1001
        }
×
1002

1003
        if err := c.syncFinalizers(); err != nil {
×
1004
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1005
        }
×
1006

1007
        if err := c.InitIPAM(); err != nil {
×
1008
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1009
        }
×
1010

1011
        if err := c.syncNodeRoutes(); err != nil {
×
1012
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1013
        }
×
1014

1015
        if err := c.syncSubnetCR(); err != nil {
×
1016
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1017
        }
×
1018

1019
        if err := c.syncVlanCR(); err != nil {
×
1020
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1021
        }
×
1022

1023
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1024
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1025
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1026
                }
×
1027
        }
1028

1029
        // start workers to do all the network operations
1030
        c.startWorkers(ctx)
×
1031

×
1032
        c.initResourceOnce()
×
1033
        <-ctx.Done()
×
1034
        klog.Info("Shutting down workers")
×
1035
}
1036

1037
func (c *Controller) dbStatus() {
×
1038
        const maxFailures = 5
×
1039

×
1040
        done := make(chan error, 2)
×
1041
        go func() {
×
1042
                done <- c.OVNNbClient.Echo(context.Background())
×
1043
        }()
×
1044
        go func() {
×
1045
                done <- c.OVNSbClient.Echo(context.Background())
×
1046
        }()
×
1047

1048
        resultsReceived := 0
×
1049
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1050

×
1051
        for resultsReceived < 2 {
×
1052
                select {
×
1053
                case err := <-done:
×
1054
                        resultsReceived++
×
1055
                        if err != nil {
×
1056
                                c.dbFailureCount++
×
1057
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1058
                                if c.dbFailureCount >= maxFailures {
×
1059
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1060
                                }
×
1061
                                return
×
1062
                        }
1063
                case <-timeout:
×
1064
                        c.dbFailureCount++
×
1065
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1066
                        if c.dbFailureCount >= maxFailures {
×
1067
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1068
                        }
×
1069
                        return
×
1070
                }
1071
        }
1072

1073
        if c.dbFailureCount > 0 {
×
1074
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1075
                c.dbFailureCount = 0
×
1076
        }
×
1077
}
1078

1079
func (c *Controller) shutdown() {
×
1080
        utilruntime.HandleCrash()
×
1081

×
1082
        c.addOrUpdatePodQueue.ShutDown()
×
1083
        c.deletePodQueue.ShutDown()
×
1084
        c.updatePodSecurityQueue.ShutDown()
×
1085

×
1086
        c.addNamespaceQueue.ShutDown()
×
1087

×
1088
        c.addOrUpdateSubnetQueue.ShutDown()
×
1089
        c.deleteSubnetQueue.ShutDown()
×
1090
        c.updateSubnetStatusQueue.ShutDown()
×
1091
        c.syncVirtualPortsQueue.ShutDown()
×
1092

×
1093
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1094
        c.updateIPPoolStatusQueue.ShutDown()
×
1095
        c.deleteIPPoolQueue.ShutDown()
×
1096

×
1097
        c.addNodeQueue.ShutDown()
×
1098
        c.updateNodeQueue.ShutDown()
×
1099
        c.deleteNodeQueue.ShutDown()
×
1100

×
1101
        c.addServiceQueue.ShutDown()
×
1102
        c.deleteServiceQueue.ShutDown()
×
1103
        c.updateServiceQueue.ShutDown()
×
1104
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1105

×
1106
        c.addVlanQueue.ShutDown()
×
1107
        c.delVlanQueue.ShutDown()
×
1108
        c.updateVlanQueue.ShutDown()
×
1109

×
1110
        c.addOrUpdateVpcQueue.ShutDown()
×
1111
        c.updateVpcStatusQueue.ShutDown()
×
1112
        c.delVpcQueue.ShutDown()
×
1113

×
1114
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1115
        c.initVpcNatGatewayQueue.ShutDown()
×
1116
        c.delVpcNatGatewayQueue.ShutDown()
×
1117
        c.updateVpcEipQueue.ShutDown()
×
1118
        c.updateVpcFloatingIPQueue.ShutDown()
×
1119
        c.updateVpcDnatQueue.ShutDown()
×
1120
        c.updateVpcSnatQueue.ShutDown()
×
1121
        c.updateVpcSubnetQueue.ShutDown()
×
1122

×
1123
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1124
        c.delVpcEgressGatewayQueue.ShutDown()
×
1125

×
1126
        if c.config.EnableLb {
×
1127
                c.addSwitchLBRuleQueue.ShutDown()
×
1128
                c.delSwitchLBRuleQueue.ShutDown()
×
1129
                c.updateSwitchLBRuleQueue.ShutDown()
×
1130

×
1131
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1132
                c.delVpcDNSQueue.ShutDown()
×
1133
        }
×
1134

1135
        c.addIPQueue.ShutDown()
×
1136
        c.updateIPQueue.ShutDown()
×
1137
        c.delIPQueue.ShutDown()
×
1138

×
1139
        c.addVirtualIPQueue.ShutDown()
×
1140
        c.updateVirtualIPQueue.ShutDown()
×
1141
        c.updateVirtualParentsQueue.ShutDown()
×
1142
        c.delVirtualIPQueue.ShutDown()
×
1143

×
1144
        c.addIptablesEipQueue.ShutDown()
×
1145
        c.updateIptablesEipQueue.ShutDown()
×
1146
        c.resetIptablesEipQueue.ShutDown()
×
1147
        c.delIptablesEipQueue.ShutDown()
×
1148

×
1149
        c.addIptablesFipQueue.ShutDown()
×
1150
        c.updateIptablesFipQueue.ShutDown()
×
1151
        c.delIptablesFipQueue.ShutDown()
×
1152

×
1153
        c.addIptablesDnatRuleQueue.ShutDown()
×
1154
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1155
        c.delIptablesDnatRuleQueue.ShutDown()
×
1156

×
1157
        c.addIptablesSnatRuleQueue.ShutDown()
×
1158
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1159
        c.delIptablesSnatRuleQueue.ShutDown()
×
1160

×
1161
        c.addQoSPolicyQueue.ShutDown()
×
1162
        c.updateQoSPolicyQueue.ShutDown()
×
1163
        c.delQoSPolicyQueue.ShutDown()
×
1164

×
1165
        c.addOvnEipQueue.ShutDown()
×
1166
        c.updateOvnEipQueue.ShutDown()
×
1167
        c.resetOvnEipQueue.ShutDown()
×
1168
        c.delOvnEipQueue.ShutDown()
×
1169

×
1170
        c.addOvnFipQueue.ShutDown()
×
1171
        c.updateOvnFipQueue.ShutDown()
×
1172
        c.delOvnFipQueue.ShutDown()
×
1173

×
1174
        c.addOvnSnatRuleQueue.ShutDown()
×
1175
        c.updateOvnSnatRuleQueue.ShutDown()
×
1176
        c.delOvnSnatRuleQueue.ShutDown()
×
1177

×
1178
        c.addOvnDnatRuleQueue.ShutDown()
×
1179
        c.updateOvnDnatRuleQueue.ShutDown()
×
1180
        c.delOvnDnatRuleQueue.ShutDown()
×
1181

×
1182
        if c.config.EnableNP {
×
1183
                c.updateNpQueue.ShutDown()
×
1184
                c.deleteNpQueue.ShutDown()
×
1185
        }
×
1186
        if c.config.EnableANP {
×
1187
                c.addAnpQueue.ShutDown()
×
1188
                c.updateAnpQueue.ShutDown()
×
1189
                c.deleteAnpQueue.ShutDown()
×
1190

×
1191
                c.addBanpQueue.ShutDown()
×
1192
                c.updateBanpQueue.ShutDown()
×
1193
                c.deleteBanpQueue.ShutDown()
×
1194
        }
×
1195

1196
        if c.config.EnableDNSNameResolver {
×
1197
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1198
                c.deleteDNSNameResolverQueue.ShutDown()
×
1199
        }
×
1200

1201
        c.addOrUpdateSgQueue.ShutDown()
×
1202
        c.delSgQueue.ShutDown()
×
1203
        c.syncSgPortsQueue.ShutDown()
×
1204

×
1205
        c.addOrUpdateCsrQueue.ShutDown()
×
1206

×
1207
        if c.config.EnableLiveMigrationOptimize {
×
1208
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1209
        }
×
1210
}
1211

1212
func (c *Controller) startWorkers(ctx context.Context) {
×
1213
        klog.Info("Starting workers")
×
1214

×
1215
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1216
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1217
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1218

×
1219
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1220
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1221
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1222
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1223
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1224
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1225
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1226
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1227
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1228
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1229
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1230
        // add default and join subnet and wait them ready
×
1231
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1232
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1233
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1234
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1235
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1236
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1237
                klog.Infof("wait for subnets %v ready", subnets)
×
1238

×
1239
                return c.allSubnetReady(subnets...)
×
1240
        })
×
1241
        if err != nil {
×
1242
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1243
        }
×
1244

1245
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1246
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1247
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1248

×
1249
        // run node worker before handle any pods
×
1250
        for range c.config.WorkerNum {
×
1251
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1252
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1253
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1254
        }
×
1255
        for {
×
1256
                ready := true
×
1257
                time.Sleep(3 * time.Second)
×
1258
                nodes, err := c.nodesLister.List(labels.Everything())
×
1259
                if err != nil {
×
1260
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1261
                }
×
1262
                for _, node := range nodes {
×
1263
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1264
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1265
                                ready = false
×
1266
                                break
×
1267
                        }
1268
                }
1269
                if ready {
×
1270
                        break
×
1271
                }
1272
        }
1273

1274
        if c.config.EnableLb {
×
1275
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1276
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1277
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1278

×
1279
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1280
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1281
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1282

×
1283
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1284
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1285
                go wait.Until(func() {
×
1286
                        c.resyncVpcDNSConfig()
×
1287
                }, 5*time.Second, ctx.Done())
×
1288
        }
1289

1290
        for range c.config.WorkerNum {
×
1291
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1292
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1293
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1294

×
1295
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1296
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1297
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1298
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1299
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1300

×
1301
                if c.config.EnableLb {
×
1302
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1303
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1304
                }
×
1305

1306
                if c.config.EnableNP {
×
1307
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1308
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1309
                }
×
1310

1311
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1312
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1313
        }
1314

1315
        if c.config.EnableEipSnat {
×
1316
                go wait.Until(func() {
×
1317
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1318
                        c.resyncExternalGateway()
×
1319
                }, time.Second, ctx.Done())
×
1320

1321
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1322
                c.OVNNbClient.MonitorBFD()
×
1323
        }
1324
        // TODO: we should merge these two vpc nat config into one config and resync them together
1325
        go wait.Until(func() {
×
1326
                c.resyncVpcNatGwConfig()
×
1327
        }, time.Second, ctx.Done())
×
1328

1329
        go wait.Until(func() {
×
1330
                c.resyncVpcNatConfig()
×
1331
        }, time.Second, ctx.Done())
×
1332

1333
        if c.config.GCInterval != 0 {
×
1334
                go wait.Until(func() {
×
1335
                        if err := c.markAndCleanLSP(); err != nil {
×
1336
                                klog.Errorf("gc lsp error: %v", err)
×
1337
                        }
×
1338
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1339
        }
1340

1341
        go wait.Until(func() {
×
1342
                if err := c.inspectPod(); err != nil {
×
1343
                        klog.Errorf("inspection error: %v", err)
×
1344
                }
×
1345
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1346

1347
        if c.config.EnableExternalVpc {
×
1348
                go wait.Until(func() {
×
1349
                        c.syncExternalVpc()
×
1350
                }, 5*time.Second, ctx.Done())
×
1351
        }
1352

1353
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1354
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1355
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1356

×
1357
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1358
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1359
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1360
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1361

×
1362
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1363
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1364
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1365

×
1366
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1367
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1368
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1369

×
1370
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1371
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1372
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1373

×
1374
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1375

×
1376
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1377
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1378
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1379

×
1380
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1381
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1382
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1383
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1384

×
1385
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1386
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1387
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1388
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1389

×
1390
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1391
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1392
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1393

×
1394
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1395
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1396
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1397

×
1398
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1399
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1400
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1401

×
1402
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1403
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1404
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1405

×
1406
        if c.config.EnableANP {
×
1407
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1408
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1409
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1410

×
1411
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1412
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1413
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1414
        }
×
1415

1416
        if c.config.EnableDNSNameResolver {
×
1417
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1418
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1419
        }
×
1420

1421
        if c.config.EnableLiveMigrationOptimize {
×
1422
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1423
        }
×
1424

1425
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1426

×
1427
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1428
}
1429

1430
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1431
        for _, lsName := range subnets {
2✔
1432
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1433
                if err != nil {
1✔
1434
                        klog.Error(err)
×
1435
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1436
                }
×
1437

1438
                if !exist {
2✔
1439
                        return false, nil
1✔
1440
                }
1✔
1441
        }
1442

1443
        return true, nil
1✔
1444
}
1445

1446
func (c *Controller) initResourceOnce() {
×
1447
        c.registerSubnetMetrics()
×
1448

×
1449
        if err := c.initNodeChassis(); err != nil {
×
1450
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1451
        }
×
1452

1453
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1454
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1455
        }
×
1456
        if err := c.syncSecurityGroup(); err != nil {
×
1457
                util.LogFatalAndExit(err, "failed to sync security group")
×
1458
        }
×
1459

1460
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1461
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1462
        }
×
1463

1464
        if err := c.initVpcNatGw(); err != nil {
×
1465
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1466
        }
×
1467
        if c.config.EnableLb {
×
1468
                if err := c.initVpcDNSConfig(); err != nil {
×
1469
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1470
                }
×
1471
        }
1472

1473
        // remove resources in ovndb that not exist any more in kubernetes resources
1474
        // process gc at last in case of affecting other init process
1475
        if err := c.gc(); err != nil {
×
1476
                util.LogFatalAndExit(err, "failed to run gc")
×
1477
        }
×
1478
}
1479

1480
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1481
        item, shutdown := queue.Get()
×
1482
        if shutdown {
×
1483
                return false
×
1484
        }
×
1485

1486
        err := func(item T) error {
×
1487
                defer queue.Done(item)
×
1488
                if err := handler(item); err != nil {
×
1489
                        queue.AddRateLimited(item)
×
1490
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1491
                }
×
1492
                queue.Forget(item)
×
1493
                return nil
×
1494
        }(item)
1495
        if err != nil {
×
1496
                utilruntime.HandleError(err)
×
1497
                return true
×
1498
        }
×
1499
        return true
×
1500
}
1501

1502
func getWorkItemKey(obj any) string {
×
1503
        switch v := obj.(type) {
×
1504
        case string:
×
1505
                return v
×
1506
        case *vpcService:
×
1507
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1508
        case *AdminNetworkPolicyChangedDelta:
×
1509
                return v.key
×
1510
        case *SlrInfo:
×
1511
                return v.Name
×
1512
        default:
×
1513
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1514
                if err != nil {
×
1515
                        utilruntime.HandleError(err)
×
1516
                        return ""
×
1517
                }
×
1518
                return key
×
1519
        }
1520
}
1521

1522
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1523
        return func() {
×
1524
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1525
                }
×
1526
        }
1527
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc