• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21348330402

26 Jan 2026 06:24AM UTC coverage: 22.918% (+0.02%) from 22.899%
21348330402

push

github

zbb88888
fix: caching NAD CRD should before all kubeovn crds and pod (#6198)

* fix: caching NAD CRD should before all kubeovn crds and pod

---------

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>

15 of 52 new or added lines in 3 files covered. (28.85%)

1 existing line in 1 file now uncovered.

12303 of 53683 relevant lines covered (22.92%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.15
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync/atomic"
9
        "time"
10

11
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
12
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
13
        "github.com/puzpuzpuz/xsync/v4"
14
        "golang.org/x/time/rate"
15
        corev1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        "k8s.io/client-go/discovery"
22
        kubeinformers "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        appsv1 "k8s.io/client-go/listers/apps/v1"
26
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
27
        v1 "k8s.io/client-go/listers/core/v1"
28
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
29
        netv1 "k8s.io/client-go/listers/networking/v1"
30
        "k8s.io/client-go/tools/cache"
31
        "k8s.io/client-go/tools/record"
32
        "k8s.io/client-go/util/workqueue"
33
        "k8s.io/klog/v2"
34
        "k8s.io/utils/keymutex"
35
        "k8s.io/utils/set"
36
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
37
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
38
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
39
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
40
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
41

42
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
43
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
44
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
45
        "github.com/kubeovn/kube-ovn/pkg/informer"
46
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
47
        "github.com/kubeovn/kube-ovn/pkg/ovs"
48
        "github.com/kubeovn/kube-ovn/pkg/util"
49
)
50

51
const controllerAgentName = "kube-ovn-controller"
52

53
const (
54
        logicalSwitchKey              = "ls"
55
        logicalRouterKey              = "lr"
56
        portGroupKey                  = "pg"
57
        networkPolicyKey              = "np"
58
        sgKey                         = "sg"
59
        sgsKey                        = "security_groups"
60
        u2oKey                        = "u2o"
61
        adminNetworkPolicyKey         = "anp"
62
        baselineAdminNetworkPolicyKey = "banp"
63
        ippoolKey                     = "ippool"
64
        clusterNetworkPolicyKey       = "cnp"
65
)
66

67
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
68
type Controller struct {
69
        config *Configuration
70

71
        ipam           *ovnipam.IPAM
72
        namedPort      *NamedPort
73
        anpPrioNameMap map[int32]string
74
        anpNamePrioMap map[string]int32
75
        bnpPrioNameMap map[int32]string
76
        bnpNamePrioMap map[string]int32
77

78
        OVNNbClient ovs.NbClient
79
        OVNSbClient ovs.SbClient
80

81
        // ExternalGatewayType define external gateway type, centralized
82
        ExternalGatewayType string
83

84
        podsLister             v1.PodLister
85
        podsSynced             cache.InformerSynced
86
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
87
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
88
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
89
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
90
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
91
        podKeyMutex            keymutex.KeyMutex
92

93
        vpcsLister           kubeovnlister.VpcLister
94
        vpcSynced            cache.InformerSynced
95
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
96
        vpcLastPoliciesMap   *xsync.Map[string, string]
97
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
98
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
99
        vpcKeyMutex          keymutex.KeyMutex
100

101
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
102
        vpcNatGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
106
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
107
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
108
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
109
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
111
        vpcNatGwKeyMutex              keymutex.KeyMutex
112
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
113

114
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
115
        vpcEgressGatewaySynced           cache.InformerSynced
116
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
118
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
119

120
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
121
        switchLBRuleSynced      cache.InformerSynced
122
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
123
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
124
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
125

126
        vpcDNSLister           kubeovnlister.VpcDnsLister
127
        vpcDNSSynced           cache.InformerSynced
128
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
129
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
130

131
        subnetsLister           kubeovnlister.SubnetLister
132
        subnetSynced            cache.InformerSynced
133
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
134
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
135
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
136
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
137
        subnetKeyMutex          keymutex.KeyMutex
138

139
        ippoolLister            kubeovnlister.IPPoolLister
140
        ippoolSynced            cache.InformerSynced
141
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
142
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
143
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
144
        ippoolKeyMutex          keymutex.KeyMutex
145

146
        ipsLister     kubeovnlister.IPLister
147
        ipSynced      cache.InformerSynced
148
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
149
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
150
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
151

152
        virtualIpsLister          kubeovnlister.VipLister
153
        virtualIpsSynced          cache.InformerSynced
154
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
155
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
156
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
157
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
158

159
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
160
        iptablesEipSynced      cache.InformerSynced
161
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
162
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
163
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
164
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
165

166
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
167
        iptablesFipSynced      cache.InformerSynced
168
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
169
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
170
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
171

172
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
173
        iptablesDnatRuleSynced      cache.InformerSynced
174
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
175
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
176
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
177

178
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
179
        iptablesSnatRuleSynced      cache.InformerSynced
180
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
181
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
182
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
183

184
        ovnEipsLister     kubeovnlister.OvnEipLister
185
        ovnEipSynced      cache.InformerSynced
186
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
187
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
188
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
189
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
190

191
        ovnFipsLister     kubeovnlister.OvnFipLister
192
        ovnFipSynced      cache.InformerSynced
193
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
194
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
195
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
196

197
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
198
        ovnSnatRuleSynced      cache.InformerSynced
199
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
200
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
201
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
202

203
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
204
        ovnDnatRuleSynced      cache.InformerSynced
205
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
206
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
207
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
208

209
        providerNetworksLister kubeovnlister.ProviderNetworkLister
210
        providerNetworkSynced  cache.InformerSynced
211

212
        vlansLister     kubeovnlister.VlanLister
213
        vlanSynced      cache.InformerSynced
214
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
215
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
216
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
217
        vlanKeyMutex    keymutex.KeyMutex
218

219
        namespacesLister  v1.NamespaceLister
220
        namespacesSynced  cache.InformerSynced
221
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
222
        nsKeyMutex        keymutex.KeyMutex
223

224
        nodesLister     v1.NodeLister
225
        nodesSynced     cache.InformerSynced
226
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
227
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
228
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
229
        nodeKeyMutex    keymutex.KeyMutex
230

231
        servicesLister     v1.ServiceLister
232
        serviceSynced      cache.InformerSynced
233
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
234
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
235
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
236
        svcKeyMutex        keymutex.KeyMutex
237

238
        endpointSlicesLister          discoveryv1.EndpointSliceLister
239
        endpointSlicesSynced          cache.InformerSynced
240
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
241
        epKeyMutex                    keymutex.KeyMutex
242

243
        deploymentsLister appsv1.DeploymentLister
244
        deploymentsSynced cache.InformerSynced
245

246
        npsLister     netv1.NetworkPolicyLister
247
        npsSynced     cache.InformerSynced
248
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
249
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
250
        npKeyMutex    keymutex.KeyMutex
251

252
        sgsLister          kubeovnlister.SecurityGroupLister
253
        sgSynced           cache.InformerSynced
254
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
255
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
256
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
257
        sgKeyMutex         keymutex.KeyMutex
258

259
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
260
        qosPolicySynced      cache.InformerSynced
261
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
262
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
263
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
264

265
        configMapsLister v1.ConfigMapLister
266
        configMapsSynced cache.InformerSynced
267

268
        anpsLister     anplister.AdminNetworkPolicyLister
269
        anpsSynced     cache.InformerSynced
270
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
271
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
272
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
273
        anpKeyMutex    keymutex.KeyMutex
274

275
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
276
        dnsNameResolversSynced          cache.InformerSynced
277
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
278
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
279

280
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
281
        banpsSynced     cache.InformerSynced
282
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
283
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
284
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
285
        banpKeyMutex    keymutex.KeyMutex
286

287
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
288
        cnpsSynced     cache.InformerSynced
289
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
290
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
291
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
292
        cnpKeyMutex    keymutex.KeyMutex
293

294
        csrLister           certListerv1.CertificateSigningRequestLister
295
        csrSynced           cache.InformerSynced
296
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
297

298
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
299
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
300
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
301

302
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
303
        netAttachSynced          cache.InformerSynced
304
        netAttachInformerFactory netAttach.SharedInformerFactory
305

306
        recorder               record.EventRecorder
307
        informerFactory        kubeinformers.SharedInformerFactory
308
        cmInformerFactory      kubeinformers.SharedInformerFactory
309
        deployInformerFactory  kubeinformers.SharedInformerFactory
310
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
311
        anpInformerFactory     anpinformer.SharedInformerFactory
312

313
        // Database health check
314
        dbFailureCount int
315

316
        distributedSubnetNeedSync atomic.Bool
317
}
318

319
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
320
        if rateLimiter == nil {
2✔
321
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
322
        }
1✔
323
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
324
}
325

326
// Run creates and runs a new ovn controller
327
func Run(ctx context.Context, config *Configuration) {
×
328
        klog.V(4).Info("Creating event broadcaster")
×
329
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
330
        eventBroadcaster.StartLogging(klog.Infof)
×
331
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
332
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
333
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
334
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
335
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
336
        )
×
337

×
338
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
339
        if err != nil {
×
340
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
341
        }
×
342

343
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
344
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
345
                        listOption.AllowWatchBookmarks = true
×
346
                }))
×
347
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
348
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
349
                        listOption.AllowWatchBookmarks = true
×
350
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
351
        // deployment informer used to list/watch vpc egress gateway workloads
352
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
353
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
354
                        listOption.AllowWatchBookmarks = true
×
355
                        listOption.LabelSelector = selector.String()
×
356
                }))
×
357
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
358
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
359
                        listOption.AllowWatchBookmarks = true
×
360
                }))
×
361
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
362
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
363
                        listOption.AllowWatchBookmarks = true
×
364
                }))
×
365

366
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
367
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
368
                        listOption.AllowWatchBookmarks = true
×
369
                }),
×
370
        )
371

372
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
373

×
374
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
375
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
376
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
377
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
378
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
379
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
380
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
381
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
382
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
383
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
384
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
385
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
386
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
387
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
388
        podInformer := informerFactory.Core().V1().Pods()
×
389
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
390
        nodeInformer := informerFactory.Core().V1().Nodes()
×
391
        serviceInformer := informerFactory.Core().V1().Services()
×
392
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
393
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
394
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
395
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
396
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
397
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
398
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
399
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
400
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
401
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
402
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
403
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
404
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
405
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
406
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
407
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
408
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
409

×
410
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
411
        controller := &Controller{
×
412
                config:             config,
×
413
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
414
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
415
                ipam:               ovnipam.NewIPAM(),
×
416
                namedPort:          NewNamedPort(),
×
417

×
418
                vpcsLister:           vpcInformer.Lister(),
×
419
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
420
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
421
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
422
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
423
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
424
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
425

×
426
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
427
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
428
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
429
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
430
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
431
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
432
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
433
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
434
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
435
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
436
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
437
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
438
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
439
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
440
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
441
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
442
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
443

×
444
                subnetsLister:           subnetInformer.Lister(),
×
445
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
446
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
447
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
448
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
449
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
450
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
451

×
452
                ippoolLister:            ippoolInformer.Lister(),
×
453
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
454
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
455
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
456
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
457
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
458

×
459
                ipsLister:     ipInformer.Lister(),
×
460
                ipSynced:      ipInformer.Informer().HasSynced,
×
461
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
462
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
463
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
464

×
465
                virtualIpsLister:          virtualIPInformer.Lister(),
×
466
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
467
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
468
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
469
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
470
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
471

×
472
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
473
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
474
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
475
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
476
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
477
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
478

×
479
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
480
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
481
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
482
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
483
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
484

×
485
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
486
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
487
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
488
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
489
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
490

×
491
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
492
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
493
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
494
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
495
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
496

×
497
                vlansLister:     vlanInformer.Lister(),
×
498
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
499
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
500
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
501
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
502
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
503

×
504
                providerNetworksLister: providerNetworkInformer.Lister(),
×
505
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
506

×
507
                podsLister:          podInformer.Lister(),
×
508
                podsSynced:          podInformer.Informer().HasSynced,
×
509
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
510
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
511
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
512
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
513
                                Name:          "DeletePod",
×
514
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
515
                        },
×
516
                ),
×
517
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
518
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
519

×
520
                namespacesLister:  namespaceInformer.Lister(),
×
521
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
522
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
523
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
524

×
525
                nodesLister:     nodeInformer.Lister(),
×
526
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
527
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
528
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
529
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
530
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
531

×
532
                servicesLister:     serviceInformer.Lister(),
×
533
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
534
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
535
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
536
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
537
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
538

×
539
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
540
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
541
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
542
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
543

×
544
                deploymentsLister: deploymentInformer.Lister(),
×
545
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
546

×
547
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
548
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
549
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
550
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
551
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
552

×
553
                configMapsLister: configMapInformer.Lister(),
×
554
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
555

×
556
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
557
                sgsLister:          sgInformer.Lister(),
×
558
                sgSynced:           sgInformer.Informer().HasSynced,
×
559
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
560
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
561
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
562

×
563
                ovnEipsLister:     ovnEipInformer.Lister(),
×
564
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
565
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
566
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
567
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
568
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
569

×
570
                ovnFipsLister:     ovnFipInformer.Lister(),
×
571
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
572
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
573
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
574
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
575

×
576
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
577
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
578
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
579
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
580
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
581

×
582
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
583
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
584
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
585
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
586
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
587

×
588
                csrLister:           csrInformer.Lister(),
×
589
                csrSynced:           csrInformer.Informer().HasSynced,
×
590
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
591

×
592
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
593
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
594
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
595

×
596
                netAttachLister:          netAttachInformer.Lister(),
×
597
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
598
                netAttachInformerFactory: attachNetInformerFactory,
×
599

×
600
                recorder:               recorder,
×
601
                informerFactory:        informerFactory,
×
602
                cmInformerFactory:      cmInformerFactory,
×
603
                deployInformerFactory:  deployInformerFactory,
×
604
                kubeovnInformerFactory: kubeovnInformerFactory,
×
605
                anpInformerFactory:     anpInformerFactory,
×
606
        }
×
607

×
608
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
609
                config.OvnNbAddr,
×
610
                config.OvnTimeout,
×
611
                config.OvsDbConnectTimeout,
×
612
                config.OvsDbInactivityTimeout,
×
613
                config.OvsDbConnectMaxRetry,
×
614
        ); err != nil {
×
615
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
616
        }
×
617
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
618
                config.OvnSbAddr,
×
619
                config.OvnTimeout,
×
620
                config.OvsDbConnectTimeout,
×
621
                config.OvsDbInactivityTimeout,
×
622
                config.OvsDbConnectMaxRetry,
×
623
        ); err != nil {
×
624
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
625
        }
×
626
        if config.EnableLb {
×
627
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
628
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
629
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
630
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
631
                        "DeleteSwitchLBRule",
×
632
                        workqueue.NewTypedMaxOfRateLimiter(
×
633
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
634
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
635
                        ),
×
636
                )
×
637
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
638
                        "UpdateSwitchLBRule",
×
639
                        workqueue.NewTypedMaxOfRateLimiter(
×
640
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
641
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
642
                        ),
×
643
                )
×
644

×
645
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
646
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
647
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
648
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
649
        }
×
650

651
        if config.EnableNP {
×
652
                controller.npsLister = npInformer.Lister()
×
653
                controller.npsSynced = npInformer.Informer().HasSynced
×
654
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
655
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
656
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
657
        }
×
658

659
        if config.EnableANP {
×
660
                controller.anpsLister = anpInformer.Lister()
×
661
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
662
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
663
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
664
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
665
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
666

×
667
                controller.banpsLister = banpInformer.Lister()
×
668
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
669
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
670
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
671
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
672
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
673

×
674
                controller.cnpsLister = cnpInformer.Lister()
×
675
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
676
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
677
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
678
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
679
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
680
        }
×
681

682
        if config.EnableDNSNameResolver {
×
683
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
684
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
685
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
686
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
687
        }
×
688

689
        defer controller.shutdown()
×
690
        klog.Info("Starting OVN controller")
×
691

×
NEW
692
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
NEW
693
        // NAD CRD is optional, so we check if it exists before starting the informer
×
NEW
694
        controller.StartNetAttachInformerFactory(ctx)
×
NEW
695

×
696
        // Wait for the caches to be synced before starting workers
×
697
        controller.informerFactory.Start(ctx.Done())
×
698
        controller.cmInformerFactory.Start(ctx.Done())
×
699
        controller.deployInformerFactory.Start(ctx.Done())
×
700
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
701
        controller.anpInformerFactory.Start(ctx.Done())
×
702
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
703

×
704
        klog.Info("Waiting for informer caches to sync")
×
705
        cacheSyncs := []cache.InformerSynced{
×
706
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
707
                controller.vpcSynced, controller.subnetSynced,
×
708
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
709
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
710
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
711
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
712
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
713
                controller.ovnDnatRuleSynced,
×
714
        }
×
715
        if controller.config.EnableLb {
×
716
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
717
        }
×
718
        if controller.config.EnableNP {
×
719
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
720
        }
×
721
        if controller.config.EnableANP {
×
722
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
723
        }
×
724
        if controller.config.EnableDNSNameResolver {
×
725
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
726
        }
×
727

728
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
729
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
730
        }
×
731

732
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
733
                AddFunc:    controller.enqueueAddPod,
×
734
                DeleteFunc: controller.enqueueDeletePod,
×
735
                UpdateFunc: controller.enqueueUpdatePod,
×
736
        }); err != nil {
×
737
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
738
        }
×
739

740
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
741
                AddFunc:    controller.enqueueAddNamespace,
×
742
                UpdateFunc: controller.enqueueUpdateNamespace,
×
743
                DeleteFunc: controller.enqueueDeleteNamespace,
×
744
        }); err != nil {
×
745
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
746
        }
×
747

748
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
749
                AddFunc:    controller.enqueueAddNode,
×
750
                UpdateFunc: controller.enqueueUpdateNode,
×
751
                DeleteFunc: controller.enqueueDeleteNode,
×
752
        }); err != nil {
×
753
                util.LogFatalAndExit(err, "failed to add node event handler")
×
754
        }
×
755

756
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
757
                AddFunc:    controller.enqueueAddService,
×
758
                DeleteFunc: controller.enqueueDeleteService,
×
759
                UpdateFunc: controller.enqueueUpdateService,
×
760
        }); err != nil {
×
761
                util.LogFatalAndExit(err, "failed to add service event handler")
×
762
        }
×
763

764
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
765
                AddFunc:    controller.enqueueAddEndpointSlice,
×
766
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
767
        }); err != nil {
×
768
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
769
        }
×
770

771
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
772
                AddFunc:    controller.enqueueAddDeployment,
×
773
                UpdateFunc: controller.enqueueUpdateDeployment,
×
774
        }); err != nil {
×
775
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
776
        }
×
777

778
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
779
                AddFunc:    controller.enqueueAddVpc,
×
780
                UpdateFunc: controller.enqueueUpdateVpc,
×
781
                DeleteFunc: controller.enqueueDelVpc,
×
782
        }); err != nil {
×
783
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
784
        }
×
785

786
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
787
                AddFunc:    controller.enqueueAddVpcNatGw,
×
788
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
789
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
790
        }); err != nil {
×
791
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
792
        }
×
793

794
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
795
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
796
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
797
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
798
        }); err != nil {
×
799
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
800
        }
×
801

802
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
803
                AddFunc:    controller.enqueueAddSubnet,
×
804
                UpdateFunc: controller.enqueueUpdateSubnet,
×
805
                DeleteFunc: controller.enqueueDeleteSubnet,
×
806
        }); err != nil {
×
807
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
808
        }
×
809

810
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
811
                AddFunc:    controller.enqueueAddIPPool,
×
812
                UpdateFunc: controller.enqueueUpdateIPPool,
×
813
                DeleteFunc: controller.enqueueDeleteIPPool,
×
814
        }); err != nil {
×
815
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
816
        }
×
817

818
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
819
                AddFunc:    controller.enqueueAddIP,
×
820
                UpdateFunc: controller.enqueueUpdateIP,
×
821
                DeleteFunc: controller.enqueueDelIP,
×
822
        }); err != nil {
×
823
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
824
        }
×
825

826
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
827
                AddFunc:    controller.enqueueAddVlan,
×
828
                DeleteFunc: controller.enqueueDelVlan,
×
829
                UpdateFunc: controller.enqueueUpdateVlan,
×
830
        }); err != nil {
×
831
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
832
        }
×
833

834
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
835
                AddFunc:    controller.enqueueAddSg,
×
836
                DeleteFunc: controller.enqueueDeleteSg,
×
837
                UpdateFunc: controller.enqueueUpdateSg,
×
838
        }); err != nil {
×
839
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
840
        }
×
841

842
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
843
                AddFunc:    controller.enqueueAddVirtualIP,
×
844
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
845
                DeleteFunc: controller.enqueueDelVirtualIP,
×
846
        }); err != nil {
×
847
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
848
        }
×
849

850
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
851
                AddFunc:    controller.enqueueAddIptablesEip,
×
852
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
853
                DeleteFunc: controller.enqueueDelIptablesEip,
×
854
        }); err != nil {
×
855
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
856
        }
×
857

858
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
859
                AddFunc:    controller.enqueueAddIptablesFip,
×
860
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
861
                DeleteFunc: controller.enqueueDelIptablesFip,
×
862
        }); err != nil {
×
863
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
864
        }
×
865

866
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
867
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
868
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
869
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
870
        }); err != nil {
×
871
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
872
        }
×
873

874
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
875
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
876
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
877
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
878
        }); err != nil {
×
879
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
880
        }
×
881

882
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
883
                AddFunc:    controller.enqueueAddOvnEip,
×
884
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
885
                DeleteFunc: controller.enqueueDelOvnEip,
×
886
        }); err != nil {
×
887
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
888
        }
×
889

890
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
891
                AddFunc:    controller.enqueueAddOvnFip,
×
892
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
893
                DeleteFunc: controller.enqueueDelOvnFip,
×
894
        }); err != nil {
×
895
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
896
        }
×
897

898
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
899
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
900
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
901
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
902
        }); err != nil {
×
903
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
904
        }
×
905

906
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
907
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
908
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
909
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
910
        }); err != nil {
×
911
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
912
        }
×
913

914
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
915
                AddFunc:    controller.enqueueAddQoSPolicy,
×
916
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
917
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
918
        }); err != nil {
×
919
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
920
        }
×
921

922
        if config.EnableLb {
×
923
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
924
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
925
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
926
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
927
                }); err != nil {
×
928
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
929
                }
×
930

931
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
932
                        AddFunc:    controller.enqueueAddVpcDNS,
×
933
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
934
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
935
                }); err != nil {
×
936
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
937
                }
×
938
        }
939

940
        if config.EnableNP {
×
941
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
942
                        AddFunc:    controller.enqueueAddNp,
×
943
                        UpdateFunc: controller.enqueueUpdateNp,
×
944
                        DeleteFunc: controller.enqueueDeleteNp,
×
945
                }); err != nil {
×
946
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
947
                }
×
948
        }
949

950
        if config.EnableANP {
×
951
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
952
                        AddFunc:    controller.enqueueAddAnp,
×
953
                        UpdateFunc: controller.enqueueUpdateAnp,
×
954
                        DeleteFunc: controller.enqueueDeleteAnp,
×
955
                }); err != nil {
×
956
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
957
                }
×
958

959
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
960
                        AddFunc:    controller.enqueueAddBanp,
×
961
                        UpdateFunc: controller.enqueueUpdateBanp,
×
962
                        DeleteFunc: controller.enqueueDeleteBanp,
×
963
                }); err != nil {
×
964
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
965
                }
×
966

967
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
968
                        AddFunc:    controller.enqueueAddCnp,
×
969
                        UpdateFunc: controller.enqueueUpdateCnp,
×
970
                        DeleteFunc: controller.enqueueDeleteCnp,
×
971
                }); err != nil {
×
972
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
973
                }
×
974

975
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
976
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
977
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
978
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
979
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
980
        }
981

982
        if config.EnableDNSNameResolver {
×
983
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
984
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
985
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
986
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
987
                }); err != nil {
×
988
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
989
                }
×
990
        }
991

992
        if config.EnableOVNIPSec {
×
993
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
994
                        AddFunc:    controller.enqueueAddCsr,
×
995
                        UpdateFunc: controller.enqueueUpdateCsr,
×
996
                        // no need to add delete func for csr
×
997
                }); err != nil {
×
998
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
999
                }
×
1000
        }
1001

1002
        controller.Run(ctx)
×
1003
}
1004

1005
// Run will set up the event handlers for types we are interested in, as well
1006
// as syncing informer caches and starting workers. It will block until stopCh
1007
// is closed, at which point it will shutdown the workqueue and wait for
1008
// workers to finish processing their current work items.
1009
func (c *Controller) Run(ctx context.Context) {
×
1010
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1011
        // Otherwise, the init process should be placed after all workers have already started working
×
1012
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1013
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1014
        }
×
1015

1016
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1017
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1018
        }
×
1019

1020
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1021
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1022
        }
×
1023

1024
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1025
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1026
        }
×
1027

1028
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1029
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1030
        }
×
1031

1032
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1033
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1034
        }
×
1035

1036
        if err := c.InitOVN(); err != nil {
×
1037
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1038
        }
×
1039

1040
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1041
        if err := c.syncIPCR(); err != nil {
×
1042
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1043
        }
×
1044

1045
        if err := c.syncFinalizers(); err != nil {
×
1046
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1047
        }
×
1048

1049
        if err := c.InitIPAM(); err != nil {
×
1050
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1051
        }
×
1052

1053
        if err := c.syncNodeRoutes(); err != nil {
×
1054
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1055
        }
×
1056

1057
        if err := c.syncSubnetCR(); err != nil {
×
1058
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1059
        }
×
1060

1061
        if err := c.syncVlanCR(); err != nil {
×
1062
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1063
        }
×
1064

1065
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1066
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1067
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1068
                }
×
1069
        }
1070

1071
        // start workers to do all the network operations
1072
        c.startWorkers(ctx)
×
1073

×
1074
        c.initResourceOnce()
×
1075
        <-ctx.Done()
×
1076
        klog.Info("Shutting down workers")
×
1077
}
1078

1079
func (c *Controller) dbStatus() {
×
1080
        const maxFailures = 5
×
1081

×
1082
        done := make(chan error, 2)
×
1083
        go func() {
×
1084
                done <- c.OVNNbClient.Echo(context.Background())
×
1085
        }()
×
1086
        go func() {
×
1087
                done <- c.OVNSbClient.Echo(context.Background())
×
1088
        }()
×
1089

1090
        resultsReceived := 0
×
1091
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1092

×
1093
        for resultsReceived < 2 {
×
1094
                select {
×
1095
                case err := <-done:
×
1096
                        resultsReceived++
×
1097
                        if err != nil {
×
1098
                                c.dbFailureCount++
×
1099
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1100
                                if c.dbFailureCount >= maxFailures {
×
1101
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1102
                                }
×
1103
                                return
×
1104
                        }
1105
                case <-timeout:
×
1106
                        c.dbFailureCount++
×
1107
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1108
                        if c.dbFailureCount >= maxFailures {
×
1109
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1110
                        }
×
1111
                        return
×
1112
                }
1113
        }
1114

1115
        if c.dbFailureCount > 0 {
×
1116
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1117
                c.dbFailureCount = 0
×
1118
        }
×
1119
}
1120

1121
func (c *Controller) shutdown() {
×
1122
        utilruntime.HandleCrash()
×
1123

×
1124
        c.addOrUpdatePodQueue.ShutDown()
×
1125
        c.deletePodQueue.ShutDown()
×
1126
        c.updatePodSecurityQueue.ShutDown()
×
1127

×
1128
        c.addNamespaceQueue.ShutDown()
×
1129

×
1130
        c.addOrUpdateSubnetQueue.ShutDown()
×
1131
        c.deleteSubnetQueue.ShutDown()
×
1132
        c.updateSubnetStatusQueue.ShutDown()
×
1133
        c.syncVirtualPortsQueue.ShutDown()
×
1134

×
1135
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1136
        c.updateIPPoolStatusQueue.ShutDown()
×
1137
        c.deleteIPPoolQueue.ShutDown()
×
1138

×
1139
        c.addNodeQueue.ShutDown()
×
1140
        c.updateNodeQueue.ShutDown()
×
1141
        c.deleteNodeQueue.ShutDown()
×
1142

×
1143
        c.addServiceQueue.ShutDown()
×
1144
        c.deleteServiceQueue.ShutDown()
×
1145
        c.updateServiceQueue.ShutDown()
×
1146
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1147

×
1148
        c.addVlanQueue.ShutDown()
×
1149
        c.delVlanQueue.ShutDown()
×
1150
        c.updateVlanQueue.ShutDown()
×
1151

×
1152
        c.addOrUpdateVpcQueue.ShutDown()
×
1153
        c.updateVpcStatusQueue.ShutDown()
×
1154
        c.delVpcQueue.ShutDown()
×
1155

×
1156
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1157
        c.initVpcNatGatewayQueue.ShutDown()
×
1158
        c.delVpcNatGatewayQueue.ShutDown()
×
1159
        c.updateVpcEipQueue.ShutDown()
×
1160
        c.updateVpcFloatingIPQueue.ShutDown()
×
1161
        c.updateVpcDnatQueue.ShutDown()
×
1162
        c.updateVpcSnatQueue.ShutDown()
×
1163
        c.updateVpcSubnetQueue.ShutDown()
×
1164

×
1165
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1166
        c.delVpcEgressGatewayQueue.ShutDown()
×
1167

×
1168
        if c.config.EnableLb {
×
1169
                c.addSwitchLBRuleQueue.ShutDown()
×
1170
                c.delSwitchLBRuleQueue.ShutDown()
×
1171
                c.updateSwitchLBRuleQueue.ShutDown()
×
1172

×
1173
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1174
                c.delVpcDNSQueue.ShutDown()
×
1175
        }
×
1176

1177
        c.addIPQueue.ShutDown()
×
1178
        c.updateIPQueue.ShutDown()
×
1179
        c.delIPQueue.ShutDown()
×
1180

×
1181
        c.addVirtualIPQueue.ShutDown()
×
1182
        c.updateVirtualIPQueue.ShutDown()
×
1183
        c.updateVirtualParentsQueue.ShutDown()
×
1184
        c.delVirtualIPQueue.ShutDown()
×
1185

×
1186
        c.addIptablesEipQueue.ShutDown()
×
1187
        c.updateIptablesEipQueue.ShutDown()
×
1188
        c.resetIptablesEipQueue.ShutDown()
×
1189
        c.delIptablesEipQueue.ShutDown()
×
1190

×
1191
        c.addIptablesFipQueue.ShutDown()
×
1192
        c.updateIptablesFipQueue.ShutDown()
×
1193
        c.delIptablesFipQueue.ShutDown()
×
1194

×
1195
        c.addIptablesDnatRuleQueue.ShutDown()
×
1196
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1197
        c.delIptablesDnatRuleQueue.ShutDown()
×
1198

×
1199
        c.addIptablesSnatRuleQueue.ShutDown()
×
1200
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1201
        c.delIptablesSnatRuleQueue.ShutDown()
×
1202

×
1203
        c.addQoSPolicyQueue.ShutDown()
×
1204
        c.updateQoSPolicyQueue.ShutDown()
×
1205
        c.delQoSPolicyQueue.ShutDown()
×
1206

×
1207
        c.addOvnEipQueue.ShutDown()
×
1208
        c.updateOvnEipQueue.ShutDown()
×
1209
        c.resetOvnEipQueue.ShutDown()
×
1210
        c.delOvnEipQueue.ShutDown()
×
1211

×
1212
        c.addOvnFipQueue.ShutDown()
×
1213
        c.updateOvnFipQueue.ShutDown()
×
1214
        c.delOvnFipQueue.ShutDown()
×
1215

×
1216
        c.addOvnSnatRuleQueue.ShutDown()
×
1217
        c.updateOvnSnatRuleQueue.ShutDown()
×
1218
        c.delOvnSnatRuleQueue.ShutDown()
×
1219

×
1220
        c.addOvnDnatRuleQueue.ShutDown()
×
1221
        c.updateOvnDnatRuleQueue.ShutDown()
×
1222
        c.delOvnDnatRuleQueue.ShutDown()
×
1223

×
1224
        if c.config.EnableNP {
×
1225
                c.updateNpQueue.ShutDown()
×
1226
                c.deleteNpQueue.ShutDown()
×
1227
        }
×
1228
        if c.config.EnableANP {
×
1229
                c.addAnpQueue.ShutDown()
×
1230
                c.updateAnpQueue.ShutDown()
×
1231
                c.deleteAnpQueue.ShutDown()
×
1232

×
1233
                c.addBanpQueue.ShutDown()
×
1234
                c.updateBanpQueue.ShutDown()
×
1235
                c.deleteBanpQueue.ShutDown()
×
1236

×
1237
                c.addCnpQueue.ShutDown()
×
1238
                c.updateCnpQueue.ShutDown()
×
1239
                c.deleteCnpQueue.ShutDown()
×
1240
        }
×
1241

1242
        if c.config.EnableDNSNameResolver {
×
1243
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1244
                c.deleteDNSNameResolverQueue.ShutDown()
×
1245
        }
×
1246

1247
        c.addOrUpdateSgQueue.ShutDown()
×
1248
        c.delSgQueue.ShutDown()
×
1249
        c.syncSgPortsQueue.ShutDown()
×
1250

×
1251
        c.addOrUpdateCsrQueue.ShutDown()
×
1252

×
1253
        if c.config.EnableLiveMigrationOptimize {
×
1254
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1255
        }
×
1256
}
1257

1258
func (c *Controller) startWorkers(ctx context.Context) {
×
1259
        klog.Info("Starting workers")
×
1260

×
1261
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1262
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1263
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1264

×
1265
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1266
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1267
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1268
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1269
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1270
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1272
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1273
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1274
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1275
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1276
        // add default and join subnet and wait them ready
×
1277
        for range c.config.WorkerNum {
×
1278
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1279
        }
×
1280
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1281
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1282
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1283
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1284
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1285
                klog.Infof("wait for subnets %v ready", subnets)
×
1286

×
1287
                return c.allSubnetReady(subnets...)
×
1288
        })
×
1289
        if err != nil {
×
1290
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1291
        }
×
1292

1293
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1294
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1295
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1296

×
1297
        // run node worker before handle any pods
×
1298
        for range c.config.WorkerNum {
×
1299
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1300
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1301
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1302
        }
×
1303
        for {
×
1304
                ready := true
×
1305
                time.Sleep(3 * time.Second)
×
1306
                nodes, err := c.nodesLister.List(labels.Everything())
×
1307
                if err != nil {
×
1308
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1309
                }
×
1310
                for _, node := range nodes {
×
1311
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1312
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1313
                                ready = false
×
1314
                                break
×
1315
                        }
1316
                }
1317
                if ready {
×
1318
                        break
×
1319
                }
1320
        }
1321

1322
        if c.config.EnableLb {
×
1323
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1324
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1325
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1326

×
1327
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1328
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1329
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1330

×
1331
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1332
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1333
                go wait.Until(func() {
×
1334
                        c.resyncVpcDNSConfig()
×
1335
                }, 5*time.Second, ctx.Done())
×
1336
        }
1337

1338
        for range c.config.WorkerNum {
×
1339
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1340
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1341
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1342

×
1343
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1344
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1345
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1346
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1347
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1348

×
1349
                if c.config.EnableLb {
×
1350
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1351
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1352
                }
×
1353

1354
                if c.config.EnableNP {
×
1355
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1356
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1357
                }
×
1358

1359
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1360
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1361
        }
1362

1363
        if c.config.EnableEipSnat {
×
1364
                go wait.Until(func() {
×
1365
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1366
                        c.resyncExternalGateway()
×
1367
                }, time.Second, ctx.Done())
×
1368

1369
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1370
                c.OVNNbClient.MonitorBFD()
×
1371
        }
1372
        // TODO: we should merge these two vpc nat config into one config and resync them together
1373
        go wait.Until(func() {
×
1374
                c.resyncVpcNatGwConfig()
×
1375
        }, time.Second, ctx.Done())
×
1376

1377
        go wait.Until(func() {
×
1378
                c.resyncVpcNatConfig()
×
1379
        }, time.Second, ctx.Done())
×
1380

1381
        if c.config.GCInterval != 0 {
×
1382
                go wait.Until(func() {
×
1383
                        if err := c.markAndCleanLSP(); err != nil {
×
1384
                                klog.Errorf("gc lsp error: %v", err)
×
1385
                        }
×
1386
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1387
        }
1388

1389
        go wait.Until(func() {
×
1390
                if err := c.inspectPod(); err != nil {
×
1391
                        klog.Errorf("inspection error: %v", err)
×
1392
                }
×
1393
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1394

1395
        if c.config.EnableExternalVpc {
×
1396
                go wait.Until(func() {
×
1397
                        c.syncExternalVpc()
×
1398
                }, 5*time.Second, ctx.Done())
×
1399
        }
1400

1401
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1402
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1403
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1404
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1405

×
1406
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1407
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1408
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1409
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1410

×
1411
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1412
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1413
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1414

×
1415
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1416
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1417
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1418

×
1419
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1420
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1421
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1422

×
1423
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1424

×
1425
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1426
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1427
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1428

×
1429
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1430
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1431
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1432
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1433

×
1434
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1435
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1436
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1437
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1438

×
1439
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1441
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1442

×
1443
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1444
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1445
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1446

×
1447
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1448
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1449
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1450

×
1451
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1452
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1453
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1454

×
1455
        if c.config.EnableANP {
×
1456
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1457
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1458
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1459

×
1460
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1461
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1462
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1463

×
1464
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1465
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1466
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1467
        }
×
1468

1469
        if c.config.EnableDNSNameResolver {
×
1470
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1471
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1472
        }
×
1473

1474
        if c.config.EnableLiveMigrationOptimize {
×
1475
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1476
        }
×
1477

1478
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1479

×
1480
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1481
}
1482

1483
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1484
        for _, lsName := range subnets {
2✔
1485
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1486
                if err != nil {
1✔
1487
                        klog.Error(err)
×
1488
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1489
                }
×
1490

1491
                if !exist {
2✔
1492
                        return false, nil
1✔
1493
                }
1✔
1494
        }
1495

1496
        return true, nil
1✔
1497
}
1498

1499
func (c *Controller) initResourceOnce() {
×
1500
        c.registerSubnetMetrics()
×
1501

×
1502
        if err := c.initNodeChassis(); err != nil {
×
1503
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1504
        }
×
1505

1506
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1507
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1508
        }
×
1509
        if err := c.syncSecurityGroup(); err != nil {
×
1510
                util.LogFatalAndExit(err, "failed to sync security group")
×
1511
        }
×
1512

1513
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1514
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1515
        }
×
1516

1517
        if err := c.initVpcNatGw(); err != nil {
×
1518
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1519
        }
×
1520
        if c.config.EnableLb {
×
1521
                if err := c.initVpcDNSConfig(); err != nil {
×
1522
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1523
                }
×
1524
        }
1525

1526
        // remove resources in ovndb that not exist any more in kubernetes resources
1527
        // process gc at last in case of affecting other init process
1528
        if err := c.gc(); err != nil {
×
1529
                util.LogFatalAndExit(err, "failed to run gc")
×
1530
        }
×
1531
}
1532

1533
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1534
        item, shutdown := queue.Get()
×
1535
        if shutdown {
×
1536
                return false
×
1537
        }
×
1538

1539
        err := func(item T) error {
×
1540
                defer queue.Done(item)
×
1541
                if err := handler(item); err != nil {
×
1542
                        queue.AddRateLimited(item)
×
1543
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1544
                }
×
1545
                queue.Forget(item)
×
1546
                return nil
×
1547
        }(item)
1548
        if err != nil {
×
1549
                utilruntime.HandleError(err)
×
1550
                return true
×
1551
        }
×
1552
        return true
×
1553
}
1554

1555
func getWorkItemKey(obj any) string {
×
1556
        switch v := obj.(type) {
×
1557
        case string:
×
1558
                return v
×
1559
        case *vpcService:
×
1560
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1561
        case *AdminNetworkPolicyChangedDelta:
×
1562
                return v.key
×
1563
        case *SlrInfo:
×
1564
                return v.Name
×
1565
        default:
×
1566
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1567
                if err != nil {
×
1568
                        utilruntime.HandleError(err)
×
1569
                        return ""
×
1570
                }
×
1571
                return key
×
1572
        }
1573
}
1574

1575
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1576
        return func() {
×
1577
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1578
                }
×
1579
        }
1580
}
1581

1582
// apiResourceExists checks if all specified kinds exist in the given group version.
1583
// It returns true if all kinds are found, false otherwise.
1584
// Parameters:
1585
// - discoveryClient: The discovery client to use for querying API resources.
1586
// - gv: The group version string (e.g., "apps/v1").
1587
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
1588
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
1589
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
1590
        if err != nil {
×
1591
                if k8serrors.IsNotFound(err) {
×
1592
                        return false, nil
×
1593
                }
×
1594
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1595
        }
1596

1597
        existingKinds := set.New[string]()
×
1598
        for _, apiResource := range apiResourceLists.APIResources {
×
1599
                existingKinds.Insert(apiResource.Kind)
×
1600
        }
×
1601

1602
        return existingKinds.HasAll(kinds...), nil
×
1603
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc