• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27498288476

14 Jun 2026 12:05PM UTC coverage: 26.649% (+0.3%) from 26.341%
27498288476

push

github

web-flow
feat(vpcnatgateway): add high-availability for nat gateways (#6661)

First phase of adding HA NAT gateways to Kube-OVN

461 of 1413 new or added lines in 13 files covered. (32.63%)

26 existing lines in 4 files now uncovered.

15874 of 59566 relevant lines covered (26.65%)

0.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.08
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        appsv1api "k8s.io/api/apps/v1"
17
        corev1 "k8s.io/api/core/v1"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        kubeinformers "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        appsv1 "k8s.io/client-go/listers/apps/v1"
26
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
27
        v1 "k8s.io/client-go/listers/core/v1"
28
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
29
        netv1 "k8s.io/client-go/listers/networking/v1"
30
        "k8s.io/client-go/tools/cache"
31
        "k8s.io/client-go/tools/record"
32
        "k8s.io/client-go/util/workqueue"
33
        "k8s.io/klog/v2"
34
        "k8s.io/utils/keymutex"
35
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
36
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
37
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
38
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
39
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
40

41
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
42
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
43
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
44
        "github.com/kubeovn/kube-ovn/pkg/informer"
45
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
46
        "github.com/kubeovn/kube-ovn/pkg/ovs"
47
        "github.com/kubeovn/kube-ovn/pkg/util"
48
)
49

50
const controllerAgentName = "kube-ovn-controller"
51

52
const (
53
        logicalSwitchKey              = "ls"
54
        logicalRouterKey              = "lr"
55
        portGroupKey                  = "pg"
56
        networkPolicyKey              = "np"
57
        sgKey                         = "sg"
58
        sgsKey                        = "security_groups"
59
        u2oKey                        = "u2o"
60
        adminNetworkPolicyKey         = "anp"
61
        baselineAdminNetworkPolicyKey = "banp"
62
        ippoolKey                     = "ippool"
63
        clusterNetworkPolicyKey       = "cnp"
64
)
65

66
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
67
type Controller struct {
68
        config *Configuration
69

70
        ipam             *ovnipam.IPAM
71
        namedPort        *NamedPort
72
        anpPrioNameMap   map[int32]string
73
        anpNamePrioMap   map[string]int32
74
        bnpPrioNameMap   map[int32]string
75
        bnpNamePrioMap   map[string]int32
76
        priorityMapMutex sync.RWMutex
77

78
        OVNNbClient ovs.NbClient
79
        OVNSbClient ovs.SbClient
80

81
        // ExternalGatewayType define external gateway type, centralized
82
        ExternalGatewayType string
83

84
        podsLister             v1.PodLister
85
        podsSynced             cache.InformerSynced
86
        podIndexer             cache.Indexer
87
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
88
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
89
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
90
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
91
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
92
        podKeyMutex            keymutex.KeyMutex
93

94
        vpcsLister           kubeovnlister.VpcLister
95
        vpcSynced            cache.InformerSynced
96
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
97
        vpcLastPoliciesMap   *xsync.Map[string, string]
98
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
99
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
100
        vpcKeyMutex          keymutex.KeyMutex
101

102
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
103
        vpcNatGatewaySynced           cache.InformerSynced
104
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
105
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
106
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
107
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
108
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
109
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
111
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
112
        vpcNatGwKeyMutex              keymutex.KeyMutex
113
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
114

115
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
116
        vpcEgressGatewaySynced           cache.InformerSynced
117
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
118
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
119
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
120

121
        // bgpConfLister/evpnConfLister are published asynchronously by the
122
        // optional-CRD background poller (StartBgpEvpnConfInformerFactory), but
123
        // read by VEG worker goroutines. Atomic pointers keep the read/write
124
        // race-free without locking the hot reconcile path.
125
        bgpConfLister  atomic.Pointer[kubeovnlister.BgpConfLister]
126
        bgpConfSynced  cache.InformerSynced
127
        evpnConfLister atomic.Pointer[kubeovnlister.EvpnConfLister]
128
        evpnConfSynced cache.InformerSynced
129

130
        routerLBRuleLister      kubeovnlister.RouterLBRuleLister
131
        routerLBRuleSynced      cache.InformerSynced
132
        addRouterLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
133
        updateRouterLBRuleQueue workqueue.TypedRateLimitingInterface[*RouterLBRuleInfo]
134
        delRouterLBRuleQueue    workqueue.TypedRateLimitingInterface[*RouterLBRuleInfo]
135

136
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
137
        switchLBRuleSynced      cache.InformerSynced
138
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
139
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
140
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
141

142
        vpcDNSLister           kubeovnlister.VpcDnsLister
143
        vpcDNSSynced           cache.InformerSynced
144
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
145
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
146

147
        subnetsLister           kubeovnlister.SubnetLister
148
        subnetSynced            cache.InformerSynced
149
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
150
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
151
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
152
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
153
        subnetKeyMutex          keymutex.KeyMutex
154

155
        ippoolLister            kubeovnlister.IPPoolLister
156
        ippoolSynced            cache.InformerSynced
157
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
158
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
159
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
160
        ippoolKeyMutex          keymutex.KeyMutex
161

162
        ipsLister     kubeovnlister.IPLister
163
        ipSynced      cache.InformerSynced
164
        ipIndexer     cache.Indexer
165
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
166
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
167
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
168

169
        virtualIpsLister          kubeovnlister.VipLister
170
        virtualIpsSynced          cache.InformerSynced
171
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
172
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
173
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
174
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
175

176
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
177
        iptablesEipSynced      cache.InformerSynced
178
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
179
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
180
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
181
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
182

183
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
184
        iptablesFipSynced      cache.InformerSynced
185
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
186
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
187
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
188

189
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
190
        iptablesDnatRuleSynced      cache.InformerSynced
191
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
193
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194

195
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
196
        iptablesSnatRuleSynced      cache.InformerSynced
197
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
198
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
199
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
200

201
        ovnEipsLister     kubeovnlister.OvnEipLister
202
        ovnEipSynced      cache.InformerSynced
203
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
204
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
205
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
206
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
207

208
        ovnFipsLister     kubeovnlister.OvnFipLister
209
        ovnFipSynced      cache.InformerSynced
210
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
211
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
212
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
213

214
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
215
        ovnSnatRuleSynced      cache.InformerSynced
216
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
217
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
218
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
219

220
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
221
        ovnDnatRuleSynced      cache.InformerSynced
222
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
223
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
224
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
225

226
        providerNetworksLister kubeovnlister.ProviderNetworkLister
227
        providerNetworkSynced  cache.InformerSynced
228

229
        vlansLister     kubeovnlister.VlanLister
230
        vlanSynced      cache.InformerSynced
231
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
232
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
233
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
234
        vlanKeyMutex    keymutex.KeyMutex
235

236
        namespacesLister  v1.NamespaceLister
237
        namespacesSynced  cache.InformerSynced
238
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
239
        nsKeyMutex        keymutex.KeyMutex
240

241
        nodesLister     v1.NodeLister
242
        nodesSynced     cache.InformerSynced
243
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
244
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
245
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
246
        nodeKeyMutex    keymutex.KeyMutex
247

248
        servicesLister     v1.ServiceLister
249
        serviceSynced      cache.InformerSynced
250
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
251
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
252
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
253
        svcKeyMutex        keymutex.KeyMutex
254

255
        endpointSlicesLister          discoveryv1.EndpointSliceLister
256
        endpointSlicesSynced          cache.InformerSynced
257
        epsIndexer                    cache.Indexer
258
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
259
        epKeyMutex                    keymutex.KeyMutex
260

261
        deploymentsLister appsv1.DeploymentLister
262
        deploymentsSynced cache.InformerSynced
263

264
        npsLister     netv1.NetworkPolicyLister
265
        npsSynced     cache.InformerSynced
266
        npIndexer     cache.Indexer
267
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
268
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
269
        npKeyMutex    keymutex.KeyMutex
270

271
        sgsLister          kubeovnlister.SecurityGroupLister
272
        sgSynced           cache.InformerSynced
273
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
274
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
275
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
276
        sgKeyMutex         keymutex.KeyMutex
277

278
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
279
        qosPolicySynced      cache.InformerSynced
280
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
281
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
282
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
283

284
        configMapsLister v1.ConfigMapLister
285
        configMapsSynced cache.InformerSynced
286

287
        anpsLister     anplister.AdminNetworkPolicyLister
288
        anpsSynced     cache.InformerSynced
289
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
290
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
291
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
292
        anpKeyMutex    keymutex.KeyMutex
293

294
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
295
        dnsNameResolverIndexer          cache.Indexer
296
        dnsNameResolversSynced          cache.InformerSynced
297
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
298
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
299

300
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
301
        banpsSynced     cache.InformerSynced
302
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
303
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
304
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
305
        banpKeyMutex    keymutex.KeyMutex
306

307
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
308
        cnpsSynced     cache.InformerSynced
309
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
310
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
311
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
312
        cnpKeyMutex    keymutex.KeyMutex
313

314
        csrLister           certListerv1.CertificateSigningRequestLister
315
        csrSynced           cache.InformerSynced
316
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
317

318
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
319
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
320
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
321

322
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
323
        netAttachSynced          cache.InformerSynced
324
        netAttachInformerFactory netAttach.SharedInformerFactory
325

326
        serviceCIDRStore           *util.ServiceCIDRStore
327
        serviceCIDRLister          netv1.ServiceCIDRLister
328
        serviceCIDRSynced          cache.InformerSynced
329
        serviceCIDRInformerFactory kubeinformers.SharedInformerFactory
330

331
        recorder               record.EventRecorder
332
        informerFactory        kubeinformers.SharedInformerFactory
333
        cmInformerFactory      kubeinformers.SharedInformerFactory
334
        deployInformerFactory  kubeinformers.SharedInformerFactory
335
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
336
        anpInformerFactory     anpinformer.SharedInformerFactory
337

338
        // Database health check
339
        dbFailureCount int
340

341
        distributedSubnetNeedSync atomic.Bool
342
}
343

344
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
345
        if rateLimiter == nil {
2✔
346
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
347
        }
1✔
348
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
349
}
350

351
// Run creates and runs a new ovn controller
352
func Run(ctx context.Context, config *Configuration) {
×
353
        klog.V(4).Info("Creating event broadcaster")
×
354
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
355
        eventBroadcaster.StartLogging(klog.Infof)
×
356
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
357
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
358
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
359
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
360
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
361
        )
×
362

×
NEW
363
        var err error
×
364
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
365
                kubeinformers.WithTransform(util.TrimPodForController),
×
366
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
367
                        listOption.AllowWatchBookmarks = true
×
368
                }))
×
369
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
370
                kubeinformers.WithNamespace(config.PodNamespace),
×
371
                kubeinformers.WithTransform(util.TrimManagedFields),
×
372
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
373
                        listOption.AllowWatchBookmarks = true
×
374
                }))
×
375
        // deployment informer used to list/watch vpc egress gateway and nat gateway workloads
376
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
377
                kubeinformers.WithTransform(util.TrimManagedFields),
×
378
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
379
                        listOption.AllowWatchBookmarks = true
×
380
                }))
×
381
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
382
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
383
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
384
                        listOption.AllowWatchBookmarks = true
×
385
                }))
×
386
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
387
                anpinformer.WithTransform(util.TrimManagedFields),
×
388
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
389
                        listOption.AllowWatchBookmarks = true
×
390
                }))
×
391
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
392
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
393
                        listOption.AllowWatchBookmarks = true
×
394
                }),
×
395
        )
396
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
397
                informer.WithTransform(util.TrimManagedFields),
×
398
        )
×
399
        // Dedicated factory so that on clusters without the ServiceCIDR API the
×
400
        // failed list/watch does not contaminate the main informer factory.
×
401
        serviceCIDRInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
402
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
403
                        listOption.AllowWatchBookmarks = true
×
404
                }),
×
405
        )
406

407
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
408
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
409
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
410
        // BgpConf/EvpnConf informers are started lazily via StartBgpEvpnConfInformerFactory
×
411
        // because their CRDs are optional on clusters that don't use vpc-egress-gateway BGP/EVPN.
×
412
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
413
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
414
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
415
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
416
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
417
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
418
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
419
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
420
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
421
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
422
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
423
        podInformer := informerFactory.Core().V1().Pods()
×
424
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
425
        nodeInformer := informerFactory.Core().V1().Nodes()
×
426
        serviceInformer := informerFactory.Core().V1().Services()
×
427
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
428
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
429
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
430
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
431
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
432
        routerLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().RouterLBRules()
×
433
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
434
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
435
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
436
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
437
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
438
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
439
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
440
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
441
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
442
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
443
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
444
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
445

×
446
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
447
        controller := &Controller{
×
448
                config:             config,
×
449
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
450
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
451
                ipam:               ovnipam.NewIPAM(),
×
452
                namedPort:          NewNamedPort(),
×
453

×
454
                vpcsLister:           vpcInformer.Lister(),
×
455
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
456
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
457
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
458
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
459
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
460
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
461

×
462
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
463
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
464
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
465
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
466
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
467
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
468
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
469
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
470
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
471
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
472
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
473
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
474
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
475
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
476
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
477
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
478
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
479

×
480
                // bgpConfLister/bgpConfSynced/evpnConfLister/evpnConfSynced are populated lazily
×
481
                // in startBgpEvpnConfInformer once the matching CRDs are detected.
×
482

×
483
                subnetsLister:           subnetInformer.Lister(),
×
484
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
485
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
486
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
487
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
488
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
489
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
490

×
491
                ippoolLister:            ippoolInformer.Lister(),
×
492
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
493
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
494
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
495
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
496
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
497

×
498
                ipsLister:     ipInformer.Lister(),
×
499
                ipSynced:      ipInformer.Informer().HasSynced,
×
500
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
501
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
502
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
503

×
504
                virtualIpsLister:          virtualIPInformer.Lister(),
×
505
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
506
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
507
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
508
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
509
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
510

×
511
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
512
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
513
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
514
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
515
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
516
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
517

×
518
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
519
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
520
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
521
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
522
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
523

×
524
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
525
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
526
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
527
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
528
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
529

×
530
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
531
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
532
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
533
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
534
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
535

×
536
                vlansLister:     vlanInformer.Lister(),
×
537
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
538
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
539
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
540
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
541
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
542

×
543
                providerNetworksLister: providerNetworkInformer.Lister(),
×
544
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
545

×
546
                podsLister:          podInformer.Lister(),
×
547
                podsSynced:          podInformer.Informer().HasSynced,
×
548
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
549
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
550
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
551
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
552
                                Name:          "DeletePod",
×
553
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
554
                        },
×
555
                ),
×
556
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
557
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
558

×
559
                namespacesLister:  namespaceInformer.Lister(),
×
560
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
561
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
562
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
563

×
564
                nodesLister:     nodeInformer.Lister(),
×
565
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
566
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
567
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
568
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
569
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
570

×
571
                servicesLister:     serviceInformer.Lister(),
×
572
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
573
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
574
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
575
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
576
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
577

×
578
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
579
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
580
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
581
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
582

×
583
                deploymentsLister: deploymentInformer.Lister(),
×
584
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
585

×
586
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
587
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
588
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
589
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
590
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
591

×
592
                configMapsLister: configMapInformer.Lister(),
×
593
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
594

×
595
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
596
                sgsLister:          sgInformer.Lister(),
×
597
                sgSynced:           sgInformer.Informer().HasSynced,
×
598
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
599
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
600
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
601

×
602
                ovnEipsLister:     ovnEipInformer.Lister(),
×
603
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
604
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
605
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
606
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
607
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
608

×
609
                ovnFipsLister:     ovnFipInformer.Lister(),
×
610
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
611
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
612
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
613
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
614

×
615
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
616
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
617
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
618
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
619
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
620

×
621
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
622
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
623
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
624
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
625
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
626

×
627
                csrLister:           csrInformer.Lister(),
×
628
                csrSynced:           csrInformer.Informer().HasSynced,
×
629
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
630

×
631
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
632
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
633
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
634

×
635
                netAttachLister:          netAttachInformer.Lister(),
×
636
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
637
                netAttachInformerFactory: attachNetInformerFactory,
×
638

×
639
                serviceCIDRStore:           util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
640
                serviceCIDRInformerFactory: serviceCIDRInformerFactory,
×
641

×
642
                recorder:               recorder,
×
643
                informerFactory:        informerFactory,
×
644
                cmInformerFactory:      cmInformerFactory,
×
645
                deployInformerFactory:  deployInformerFactory,
×
646
                kubeovnInformerFactory: kubeovnInformerFactory,
×
647
                anpInformerFactory:     anpInformerFactory,
×
648
        }
×
649

×
650
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
651
                config.OvnNbAddr,
×
652
                config.OvnTimeout,
×
653
                config.OvsDbConnectTimeout,
×
654
                config.OvsDbInactivityTimeout,
×
655
                config.OvsDbConnectMaxRetry,
×
656
        ); err != nil {
×
657
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
658
        }
×
659
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
660
                config.OvnSbAddr,
×
661
                config.OvnTimeout,
×
662
                config.OvsDbConnectTimeout,
×
663
                config.OvsDbInactivityTimeout,
×
664
                config.OvsDbConnectMaxRetry,
×
665
        ); err != nil {
×
666
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
667
        }
×
668
        if config.EnableLb {
×
669
                controller.routerLBRuleLister = routerLBRuleInformer.Lister()
×
670
                controller.routerLBRuleSynced = routerLBRuleInformer.Informer().HasSynced
×
671
                controller.addRouterLBRuleQueue = newTypedRateLimitingQueue("AddRouterLBRule", custCrdRateLimiter)
×
672
                controller.delRouterLBRuleQueue = newTypedRateLimitingQueue(
×
673
                        "DeleteRouterLBRule",
×
674
                        workqueue.NewTypedMaxOfRateLimiter(
×
675
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*RouterLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
676
                                &workqueue.TypedBucketRateLimiter[*RouterLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
677
                        ),
×
678
                )
×
679
                controller.updateRouterLBRuleQueue = newTypedRateLimitingQueue(
×
680
                        "UpdateRouterLBRule",
×
681
                        workqueue.NewTypedMaxOfRateLimiter(
×
682
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*RouterLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
683
                                &workqueue.TypedBucketRateLimiter[*RouterLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
684
                        ),
×
685
                )
×
686

×
687
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
688
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
689
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
690
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
691
                        "DeleteSwitchLBRule",
×
692
                        workqueue.NewTypedMaxOfRateLimiter(
×
693
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
694
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
695
                        ),
×
696
                )
×
697
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
698
                        "UpdateSwitchLBRule",
×
699
                        workqueue.NewTypedMaxOfRateLimiter(
×
700
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
701
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
702
                        ),
×
703
                )
×
704

×
705
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
706
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
707
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
708
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
709
        }
×
710

711
        if config.EnableNP {
×
712
                controller.npsLister = npInformer.Lister()
×
713
                controller.npsSynced = npInformer.Informer().HasSynced
×
714
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
715
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
716
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
717
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
718
        }
×
719

720
        if config.EnableANP {
×
721
                controller.anpsLister = anpInformer.Lister()
×
722
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
723
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
724
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
725
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
726
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
727

×
728
                controller.banpsLister = banpInformer.Lister()
×
729
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
730
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
731
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
732
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
733
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
734

×
735
                controller.cnpsLister = cnpInformer.Lister()
×
736
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
737
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
738
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
739
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
740
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
741
        }
×
742

743
        if config.EnableDNSNameResolver {
×
744
                if !config.EnableANP {
×
745
                        klog.Warning("DNS name resolver is enabled but ANP support is disabled, DNSNameResolver resources will not take effect")
×
746
                }
×
747
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
748
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
749
                if err := dnsNameResolverInformer.Informer().AddIndexers(cache.Indexers{
×
750
                        IndexDNSNameResolverByName: indexDNSNameResolverByName,
×
751
                }); err != nil {
×
752
                        util.LogFatalAndExit(err, "failed to add DNSNameResolver indexer")
×
753
                }
×
754
                controller.dnsNameResolverIndexer = dnsNameResolverInformer.Informer().GetIndexer()
×
755
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
756
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
757
        }
758

759
        if err := controller.setupIndexers(podInformer.Informer(), endpointSliceInformer.Informer(), ipInformer.Informer()); err != nil {
×
760
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
761
        }
×
762

763
        defer controller.shutdown()
×
764
        klog.Info("Starting OVN controller")
×
765

×
766
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
767
        // NAD CRD is optional, so we check if it exists before starting the informer
×
768
        controller.StartNetAttachInformerFactory(ctx)
×
769

×
770
        // ServiceCIDR (networking.k8s.io/v1) is GA in K8s 1.33; older clusters
×
771
        // don't have the API at all. Best-effort start with periodic retry.
×
772
        controller.StartServiceCIDRInformerFactory(ctx)
×
773

×
774
        // BgpConf/EvpnConf are optional CRDs (v1.16.0+, used by vpc-egress-gateway BGP/EVPN).
×
775
        // They may be missing on clusters upgraded from <v1.16 via Helm, which does not
×
776
        // re-apply the `crds/` directory on `helm upgrade`. Best-effort start with periodic retry.
×
777
        controller.StartBgpEvpnConfInformerFactory(ctx)
×
778

×
779
        // Wait for the caches to be synced before starting workers
×
780
        controller.informerFactory.Start(ctx.Done())
×
781
        controller.cmInformerFactory.Start(ctx.Done())
×
782
        controller.deployInformerFactory.Start(ctx.Done())
×
783
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
784
        controller.anpInformerFactory.Start(ctx.Done())
×
785
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
786

×
787
        klog.Info("Waiting for informer caches to sync")
×
788
        cacheSyncs := []cache.InformerSynced{
×
789
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
790
                controller.vpcSynced, controller.subnetSynced,
×
791
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
792
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
793
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
794
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
795
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
796
                controller.ovnDnatRuleSynced,
×
797
        }
×
798
        if controller.config.EnableLb {
×
799
                cacheSyncs = append(cacheSyncs, controller.routerLBRuleSynced, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
800
        }
×
801
        if controller.config.EnableNP {
×
802
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
803
        }
×
804
        if controller.config.EnableANP {
×
805
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
806
        }
×
807
        if controller.config.EnableDNSNameResolver {
×
808
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
809
        }
×
810

811
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
812
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
813
        }
×
814

815
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
816
                AddFunc:    controller.enqueueAddPod,
×
817
                DeleteFunc: controller.enqueueDeletePod,
×
818
                UpdateFunc: controller.enqueueUpdatePod,
×
819
        }); err != nil {
×
820
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
821
        }
×
822

823
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
824
                AddFunc:    controller.enqueueAddNamespace,
×
825
                UpdateFunc: controller.enqueueUpdateNamespace,
×
826
                DeleteFunc: controller.enqueueDeleteNamespace,
×
827
        }); err != nil {
×
828
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
829
        }
×
830

831
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
832
                AddFunc:    controller.enqueueAddNode,
×
833
                UpdateFunc: controller.enqueueUpdateNode,
×
834
                DeleteFunc: controller.enqueueDeleteNode,
×
835
        }); err != nil {
×
836
                util.LogFatalAndExit(err, "failed to add node event handler")
×
837
        }
×
838

839
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
840
                AddFunc:    controller.enqueueAddService,
×
841
                DeleteFunc: controller.enqueueDeleteService,
×
842
                UpdateFunc: controller.enqueueUpdateService,
×
843
        }); err != nil {
×
844
                util.LogFatalAndExit(err, "failed to add service event handler")
×
845
        }
×
846

847
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
848
                AddFunc:    controller.enqueueAddEndpointSlice,
×
849
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
850
        }); err != nil {
×
851
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
852
        }
×
853

NEW
854
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
×
NEW
855
                FilterFunc: func(obj any) bool {
×
NEW
856
                        if deploy, ok := obj.(*appsv1api.Deployment); ok {
×
NEW
857
                                // Only watch deployments with VpcEgressGatewayLabel or VpcNatGatewayLabel
×
NEW
858
                                _, hasNatGwLabel := deploy.Labels[util.VpcNatGatewayLabel]
×
NEW
859
                                _, hasEgressGwLabel := deploy.Labels[util.VpcEgressGatewayLabel]
×
NEW
860
                                return hasNatGwLabel || hasEgressGwLabel
×
NEW
861
                        }
×
NEW
862
                        return false
×
863
                },
864
                Handler: cache.ResourceEventHandlerFuncs{
865
                        AddFunc:    controller.enqueueAddDeployment,
866
                        UpdateFunc: controller.enqueueUpdateDeployment,
867
                },
868
        }); err != nil {
×
869
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
870
        }
×
871

872
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
873
                AddFunc:    controller.enqueueAddVpc,
×
874
                UpdateFunc: controller.enqueueUpdateVpc,
×
875
                DeleteFunc: controller.enqueueDelVpc,
×
876
        }); err != nil {
×
877
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
878
        }
×
879

880
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                AddFunc:    controller.enqueueAddVpcNatGw,
×
882
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
883
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
884
        }); err != nil {
×
885
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
886
        }
×
887

888
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
889
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
890
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
891
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
892
        }); err != nil {
×
893
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
894
        }
×
895

896
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
897
                AddFunc:    controller.enqueueAddSubnet,
×
898
                UpdateFunc: controller.enqueueUpdateSubnet,
×
899
                DeleteFunc: controller.enqueueDeleteSubnet,
×
900
        }); err != nil {
×
901
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
902
        }
×
903

904
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                AddFunc:    controller.enqueueAddIPPool,
×
906
                UpdateFunc: controller.enqueueUpdateIPPool,
×
907
                DeleteFunc: controller.enqueueDeleteIPPool,
×
908
        }); err != nil {
×
909
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
910
        }
×
911

912
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
913
                AddFunc:    controller.enqueueAddIP,
×
914
                UpdateFunc: controller.enqueueUpdateIP,
×
915
                DeleteFunc: controller.enqueueDelIP,
×
916
        }); err != nil {
×
917
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
918
        }
×
919

920
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
921
                AddFunc:    controller.enqueueAddVlan,
×
922
                DeleteFunc: controller.enqueueDelVlan,
×
923
                UpdateFunc: controller.enqueueUpdateVlan,
×
924
        }); err != nil {
×
925
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
926
        }
×
927

928
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
929
                AddFunc:    controller.enqueueAddSg,
×
930
                DeleteFunc: controller.enqueueDeleteSg,
×
931
                UpdateFunc: controller.enqueueUpdateSg,
×
932
        }); err != nil {
×
933
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
934
        }
×
935

936
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
937
                AddFunc:    controller.enqueueAddVirtualIP,
×
938
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
939
                DeleteFunc: controller.enqueueDelVirtualIP,
×
940
        }); err != nil {
×
941
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
942
        }
×
943

944
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
945
                AddFunc:    controller.enqueueAddIptablesEip,
×
946
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
947
                DeleteFunc: controller.enqueueDelIptablesEip,
×
948
        }); err != nil {
×
949
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
950
        }
×
951

952
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
953
                AddFunc:    controller.enqueueAddIptablesFip,
×
954
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
955
                DeleteFunc: controller.enqueueDelIptablesFip,
×
956
        }); err != nil {
×
957
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
958
        }
×
959

960
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
961
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
962
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
963
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
964
        }); err != nil {
×
965
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
966
        }
×
967

968
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
969
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
970
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
971
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
972
        }); err != nil {
×
973
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
974
        }
×
975

976
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
977
                AddFunc:    controller.enqueueAddOvnEip,
×
978
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
979
                DeleteFunc: controller.enqueueDelOvnEip,
×
980
        }); err != nil {
×
981
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
982
        }
×
983

984
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
985
                AddFunc:    controller.enqueueAddOvnFip,
×
986
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
987
                DeleteFunc: controller.enqueueDelOvnFip,
×
988
        }); err != nil {
×
989
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
990
        }
×
991

992
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
993
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
994
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
995
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
996
        }); err != nil {
×
997
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
998
        }
×
999

1000
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1001
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
1002
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
1003
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
1004
        }); err != nil {
×
1005
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
1006
        }
×
1007

1008
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1009
                AddFunc:    controller.enqueueAddQoSPolicy,
×
1010
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
1011
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
1012
        }); err != nil {
×
1013
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
1014
        }
×
1015

1016
        if config.EnableLb {
×
1017
                if _, err = routerLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1018
                        AddFunc:    controller.enqueueAddRouterLBRule,
×
1019
                        UpdateFunc: controller.enqueueUpdateRouterLBRule,
×
1020
                        DeleteFunc: controller.enqueueDeleteRouterLBRule,
×
1021
                }); err != nil {
×
1022
                        util.LogFatalAndExit(err, "failed to add router lb rule event handler")
×
1023
                }
×
1024

1025
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1026
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
1027
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
1028
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
1029
                }); err != nil {
×
1030
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
1031
                }
×
1032

1033
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1034
                        AddFunc:    controller.enqueueAddVpcDNS,
×
1035
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
1036
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
1037
                }); err != nil {
×
1038
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
1039
                }
×
1040
        }
1041

1042
        if config.EnableNP {
×
1043
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1044
                        AddFunc:    controller.enqueueAddNp,
×
1045
                        UpdateFunc: controller.enqueueUpdateNp,
×
1046
                        DeleteFunc: controller.enqueueDeleteNp,
×
1047
                }); err != nil {
×
1048
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
1049
                }
×
1050
        }
1051

1052
        if config.EnableANP {
×
1053
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1054
                        AddFunc:    controller.enqueueAddAnp,
×
1055
                        UpdateFunc: controller.enqueueUpdateAnp,
×
1056
                        DeleteFunc: controller.enqueueDeleteAnp,
×
1057
                }); err != nil {
×
1058
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
1059
                }
×
1060

1061
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1062
                        AddFunc:    controller.enqueueAddBanp,
×
1063
                        UpdateFunc: controller.enqueueUpdateBanp,
×
1064
                        DeleteFunc: controller.enqueueDeleteBanp,
×
1065
                }); err != nil {
×
1066
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
1067
                }
×
1068

1069
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1070
                        AddFunc:    controller.enqueueAddCnp,
×
1071
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1072
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1073
                }); err != nil {
×
1074
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1075
                }
×
1076

1077
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1078
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1079
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1080
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1081
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1082
        }
1083

1084
        if config.EnableDNSNameResolver {
×
1085
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1086
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1087
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1088
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1089
                }); err != nil {
×
1090
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1091
                }
×
1092
        }
1093

1094
        if config.EnableOVNIPSec {
×
1095
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1096
                        AddFunc:    controller.enqueueAddCsr,
×
1097
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1098
                        // no need to add delete func for csr
×
1099
                }); err != nil {
×
1100
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1101
                }
×
1102
        }
1103

1104
        controller.Run(ctx)
×
1105
}
1106

1107
// Run will set up the event handlers for types we are interested in, as well
1108
// as syncing informer caches and starting workers. It will block until stopCh
1109
// is closed, at which point it will shutdown the workqueue and wait for
1110
// workers to finish processing their current work items.
1111
func (c *Controller) Run(ctx context.Context) {
×
1112
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1113
        // Otherwise, the init process should be placed after all workers have already started working
×
1114
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1115
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1116
        }
×
1117

1118
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1119
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1120
        }
×
1121

1122
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1123
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1124
        }
×
1125

1126
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1127
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1128
        }
×
1129

1130
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1131
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1132
        }
×
1133

1134
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1135
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1136
        }
×
1137

1138
        if err := c.InitOVN(); err != nil {
×
1139
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1140
        }
×
1141

1142
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1143
        if err := c.syncIPCR(); err != nil {
×
1144
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1145
        }
×
1146

1147
        if err := c.syncFinalizers(); err != nil {
×
1148
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1149
        }
×
1150

1151
        if err := c.InitIPAM(); err != nil {
×
1152
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1153
        }
×
1154

1155
        if err := c.syncNodeRoutes(); err != nil {
×
1156
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1157
        }
×
1158

1159
        if err := c.syncSubnetCR(); err != nil {
×
1160
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1161
        }
×
1162

1163
        if err := c.syncVlanCR(); err != nil {
×
1164
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1165
        }
×
1166

1167
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1168
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1169
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1170
                }
×
1171
        }
1172

1173
        // start workers to do all the network operations
1174
        c.startWorkers(ctx)
×
1175

×
1176
        c.initResourceOnce()
×
1177
        <-ctx.Done()
×
1178
        klog.Info("Shutting down workers")
×
1179

×
1180
        c.OVNNbClient.Close()
×
1181
        c.OVNSbClient.Close()
×
1182
}
1183

1184
func (c *Controller) dbStatus() {
×
1185
        const maxFailures = 5
×
1186

×
1187
        done := make(chan error, 2)
×
1188
        go func() {
×
1189
                done <- c.OVNNbClient.Echo(context.Background())
×
1190
        }()
×
1191
        go func() {
×
1192
                done <- c.OVNSbClient.Echo(context.Background())
×
1193
        }()
×
1194

1195
        resultsReceived := 0
×
1196
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1197

×
1198
        for resultsReceived < 2 {
×
1199
                select {
×
1200
                case err := <-done:
×
1201
                        resultsReceived++
×
1202
                        if err != nil {
×
1203
                                c.dbFailureCount++
×
1204
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1205
                                if c.dbFailureCount >= maxFailures {
×
1206
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1207
                                }
×
1208
                                return
×
1209
                        }
1210
                case <-timeout:
×
1211
                        c.dbFailureCount++
×
1212
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1213
                        if c.dbFailureCount >= maxFailures {
×
1214
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1215
                        }
×
1216
                        return
×
1217
                }
1218
        }
1219

1220
        if c.dbFailureCount > 0 {
×
1221
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1222
                c.dbFailureCount = 0
×
1223
        }
×
1224
}
1225

1226
func (c *Controller) shutdown() {
×
1227
        utilruntime.HandleCrash()
×
1228

×
1229
        c.addOrUpdatePodQueue.ShutDown()
×
1230
        c.deletePodQueue.ShutDown()
×
1231
        c.updatePodSecurityQueue.ShutDown()
×
1232

×
1233
        c.addNamespaceQueue.ShutDown()
×
1234

×
1235
        c.addOrUpdateSubnetQueue.ShutDown()
×
1236
        c.deleteSubnetQueue.ShutDown()
×
1237
        c.updateSubnetStatusQueue.ShutDown()
×
1238
        c.syncVirtualPortsQueue.ShutDown()
×
1239

×
1240
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1241
        c.updateIPPoolStatusQueue.ShutDown()
×
1242
        c.deleteIPPoolQueue.ShutDown()
×
1243

×
1244
        c.addNodeQueue.ShutDown()
×
1245
        c.updateNodeQueue.ShutDown()
×
1246
        c.deleteNodeQueue.ShutDown()
×
1247

×
1248
        c.addServiceQueue.ShutDown()
×
1249
        c.deleteServiceQueue.ShutDown()
×
1250
        c.updateServiceQueue.ShutDown()
×
1251
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1252

×
1253
        c.addVlanQueue.ShutDown()
×
1254
        c.delVlanQueue.ShutDown()
×
1255
        c.updateVlanQueue.ShutDown()
×
1256

×
1257
        c.addOrUpdateVpcQueue.ShutDown()
×
1258
        c.updateVpcStatusQueue.ShutDown()
×
1259
        c.delVpcQueue.ShutDown()
×
1260

×
1261
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1262
        c.initVpcNatGatewayQueue.ShutDown()
×
1263
        c.delVpcNatGatewayQueue.ShutDown()
×
1264
        c.updateVpcEipQueue.ShutDown()
×
1265
        c.updateVpcFloatingIPQueue.ShutDown()
×
1266
        c.updateVpcDnatQueue.ShutDown()
×
1267
        c.updateVpcSnatQueue.ShutDown()
×
1268
        c.updateVpcSubnetQueue.ShutDown()
×
1269

×
1270
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1271
        c.delVpcEgressGatewayQueue.ShutDown()
×
1272

×
1273
        if c.config.EnableLb {
×
1274
                c.addRouterLBRuleQueue.ShutDown()
×
1275
                c.delRouterLBRuleQueue.ShutDown()
×
1276
                c.updateRouterLBRuleQueue.ShutDown()
×
1277

×
1278
                c.addSwitchLBRuleQueue.ShutDown()
×
1279
                c.delSwitchLBRuleQueue.ShutDown()
×
1280
                c.updateSwitchLBRuleQueue.ShutDown()
×
1281

×
1282
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1283
                c.delVpcDNSQueue.ShutDown()
×
1284
        }
×
1285

1286
        c.addIPQueue.ShutDown()
×
1287
        c.updateIPQueue.ShutDown()
×
1288
        c.delIPQueue.ShutDown()
×
1289

×
1290
        c.addVirtualIPQueue.ShutDown()
×
1291
        c.updateVirtualIPQueue.ShutDown()
×
1292
        c.updateVirtualParentsQueue.ShutDown()
×
1293
        c.delVirtualIPQueue.ShutDown()
×
1294

×
1295
        c.addIptablesEipQueue.ShutDown()
×
1296
        c.updateIptablesEipQueue.ShutDown()
×
1297
        c.resetIptablesEipQueue.ShutDown()
×
1298
        c.delIptablesEipQueue.ShutDown()
×
1299

×
1300
        c.addIptablesFipQueue.ShutDown()
×
1301
        c.updateIptablesFipQueue.ShutDown()
×
1302
        c.delIptablesFipQueue.ShutDown()
×
1303

×
1304
        c.addIptablesDnatRuleQueue.ShutDown()
×
1305
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1306
        c.delIptablesDnatRuleQueue.ShutDown()
×
1307

×
1308
        c.addIptablesSnatRuleQueue.ShutDown()
×
1309
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1310
        c.delIptablesSnatRuleQueue.ShutDown()
×
1311

×
1312
        c.addQoSPolicyQueue.ShutDown()
×
1313
        c.updateQoSPolicyQueue.ShutDown()
×
1314
        c.delQoSPolicyQueue.ShutDown()
×
1315

×
1316
        c.addOvnEipQueue.ShutDown()
×
1317
        c.updateOvnEipQueue.ShutDown()
×
1318
        c.resetOvnEipQueue.ShutDown()
×
1319
        c.delOvnEipQueue.ShutDown()
×
1320

×
1321
        c.addOvnFipQueue.ShutDown()
×
1322
        c.updateOvnFipQueue.ShutDown()
×
1323
        c.delOvnFipQueue.ShutDown()
×
1324

×
1325
        c.addOvnSnatRuleQueue.ShutDown()
×
1326
        c.updateOvnSnatRuleQueue.ShutDown()
×
1327
        c.delOvnSnatRuleQueue.ShutDown()
×
1328

×
1329
        c.addOvnDnatRuleQueue.ShutDown()
×
1330
        c.updateOvnDnatRuleQueue.ShutDown()
×
1331
        c.delOvnDnatRuleQueue.ShutDown()
×
1332

×
1333
        if c.config.EnableNP {
×
1334
                c.updateNpQueue.ShutDown()
×
1335
                c.deleteNpQueue.ShutDown()
×
1336
        }
×
1337
        if c.config.EnableANP {
×
1338
                c.addAnpQueue.ShutDown()
×
1339
                c.updateAnpQueue.ShutDown()
×
1340
                c.deleteAnpQueue.ShutDown()
×
1341

×
1342
                c.addBanpQueue.ShutDown()
×
1343
                c.updateBanpQueue.ShutDown()
×
1344
                c.deleteBanpQueue.ShutDown()
×
1345

×
1346
                c.addCnpQueue.ShutDown()
×
1347
                c.updateCnpQueue.ShutDown()
×
1348
                c.deleteCnpQueue.ShutDown()
×
1349
        }
×
1350

1351
        if c.config.EnableDNSNameResolver {
×
1352
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1353
                c.deleteDNSNameResolverQueue.ShutDown()
×
1354
        }
×
1355

1356
        c.addOrUpdateSgQueue.ShutDown()
×
1357
        c.delSgQueue.ShutDown()
×
1358
        c.syncSgPortsQueue.ShutDown()
×
1359

×
1360
        c.addOrUpdateCsrQueue.ShutDown()
×
1361

×
1362
        if c.config.EnableLiveMigrationOptimize {
×
1363
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1364
        }
×
1365
}
1366

1367
func (c *Controller) startWorkers(ctx context.Context) {
×
1368
        klog.Info("Starting workers")
×
1369

×
1370
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1371
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1372
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1373

×
1374
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1375
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1376
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1377
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1378
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1379
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1380
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1381
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1382
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1383
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1384
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1385
        // add default and join subnet and wait them ready
×
1386
        for range c.config.WorkerNum {
×
1387
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1388
        }
×
1389
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1390
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1391
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1392
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1393
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1394
                klog.Infof("wait for subnets %v ready", subnets)
×
1395

×
1396
                return c.allSubnetReady(subnets...)
×
1397
        })
×
1398
        if err != nil {
×
1399
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1400
        }
×
1401

1402
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1403
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1404
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1405

×
1406
        // run node worker before handle any pods
×
1407
        for range c.config.WorkerNum {
×
1408
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1409
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1410
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1411
        }
×
1412
        for {
×
1413
                ready := true
×
1414
                time.Sleep(3 * time.Second)
×
1415
                nodes, err := c.nodesLister.List(labels.Everything())
×
1416
                if err != nil {
×
1417
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1418
                }
×
1419
                for _, node := range nodes {
×
1420
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1421
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1422
                                ready = false
×
1423
                                break
×
1424
                        }
1425
                }
1426
                if ready {
×
1427
                        break
×
1428
                }
1429
        }
1430

1431
        if c.config.EnableLb {
×
1432
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1433
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1434
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1435

×
1436
                go wait.Until(runWorker("add/update router lb rule", c.addRouterLBRuleQueue, c.handleAddOrUpdateRouterLBRule), time.Second, ctx.Done())
×
1437
                go wait.Until(runWorker("delete router lb rule", c.delRouterLBRuleQueue, c.handleDelRouterLBRule), time.Second, ctx.Done())
×
1438
                go wait.Until(runWorker("update router lb rule", c.updateRouterLBRuleQueue, c.handleUpdateRouterLBRule), time.Second, ctx.Done())
×
1439

×
1440
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1441
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1442
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1443

×
1444
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1445
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1446
                go wait.Until(func() {
×
1447
                        c.resyncVpcDNSConfig()
×
1448
                }, 5*time.Second, ctx.Done())
×
1449
        }
1450

1451
        for range c.config.WorkerNum {
×
1452
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1453
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1454
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1455

×
1456
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1457
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1458
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1459
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1460
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1461

×
1462
                if c.config.EnableLb {
×
1463
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1464
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1465
                }
×
1466

1467
                if c.config.EnableNP {
×
1468
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1469
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1470
                }
×
1471

1472
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1473
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1474
        }
1475

1476
        if c.config.EnableEipSnat {
×
1477
                go wait.Until(func() {
×
1478
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1479
                        c.resyncExternalGateway()
×
1480
                }, time.Second, ctx.Done())
×
1481

1482
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1483
                c.OVNNbClient.MonitorBFD()
×
1484
        }
1485
        // TODO: we should merge these two vpc nat config into one config and resync them together
1486
        go wait.Until(func() {
×
1487
                c.resyncVpcNatGwConfig()
×
1488
        }, time.Second, ctx.Done())
×
1489

1490
        go wait.Until(func() {
×
1491
                c.resyncVpcNatConfig()
×
1492
        }, time.Second, ctx.Done())
×
1493

1494
        if c.config.GCInterval != 0 {
×
1495
                go wait.Until(func() {
×
1496
                        if err := c.markAndCleanLSP(); err != nil {
×
1497
                                klog.Errorf("gc lsp error: %v", err)
×
1498
                        }
×
1499
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1500
        }
1501

1502
        go wait.Until(func() {
×
1503
                if err := c.inspectPod(); err != nil {
×
1504
                        klog.Errorf("inspection error: %v", err)
×
1505
                }
×
1506
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1507

1508
        if c.config.EnableExternalVpc {
×
1509
                go wait.Until(func() {
×
1510
                        c.syncExternalVpc()
×
1511
                }, 5*time.Second, ctx.Done())
×
1512
        }
1513

1514
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1515
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1516
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1517
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1518

×
1519
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1520
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1521
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1522
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1523

×
1524
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1525
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1526
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1527

×
1528
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1529
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1530
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1531

×
1532
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1533
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1534
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1535

×
1536
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1537

×
1538
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1539
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1540
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1541

×
1542
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1543
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1544
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1545
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1546

×
1547
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1548
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1549
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1550
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1551

×
1552
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1553
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1554
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1555

×
1556
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1557
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1558
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1559

×
1560
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1561
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1562
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1563

×
1564
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1565
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1566
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1567

×
1568
        if c.config.EnableANP {
×
1569
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1570
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1571
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1572

×
1573
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1574
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1575
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1576

×
1577
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1578
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1579
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1580
        }
×
1581

1582
        if c.config.EnableDNSNameResolver {
×
1583
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1584
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1585
        }
×
1586

1587
        if c.config.EnableLiveMigrationOptimize {
×
1588
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1589
        }
×
1590

1591
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1592

×
1593
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1594
}
1595

1596
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1597
        for _, lsName := range subnets {
2✔
1598
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1599
                if err != nil {
1✔
1600
                        klog.Error(err)
×
1601
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1602
                }
×
1603

1604
                if !exist {
2✔
1605
                        return false, nil
1✔
1606
                }
1✔
1607
        }
1608

1609
        return true, nil
1✔
1610
}
1611

1612
func (c *Controller) initResourceOnce() {
×
1613
        c.registerSubnetMetrics()
×
1614

×
1615
        if err := c.initNodeChassis(); err != nil {
×
1616
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1617
        }
×
1618

1619
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1620
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1621
        }
×
1622
        if err := c.syncSecurityGroup(); err != nil {
×
1623
                util.LogFatalAndExit(err, "failed to sync security group")
×
1624
        }
×
1625

1626
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1627
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1628
        }
×
1629

1630
        if err := c.initVpcNatGw(); err != nil {
×
1631
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1632
        }
×
1633
        if c.config.EnableLb {
×
1634
                if err := c.initVpcDNSConfig(); err != nil {
×
1635
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1636
                }
×
1637
        }
1638

1639
        // remove resources in ovndb that not exist any more in kubernetes resources
1640
        // process gc at last in case of affecting other init process
1641
        if err := c.gc(); err != nil {
×
1642
                util.LogFatalAndExit(err, "failed to run gc")
×
1643
        }
×
1644
}
1645

1646
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1647
        item, shutdown := queue.Get()
×
1648
        if shutdown {
×
1649
                return false
×
1650
        }
×
1651

1652
        err := func(item T) error {
×
1653
                defer queue.Done(item)
×
1654
                if err := handler(item); err != nil {
×
1655
                        queue.AddRateLimited(item)
×
1656
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1657
                }
×
1658
                queue.Forget(item)
×
1659
                return nil
×
1660
        }(item)
1661
        if err != nil {
×
1662
                utilruntime.HandleError(err)
×
1663
                return true
×
1664
        }
×
1665
        return true
×
1666
}
1667

1668
func getWorkItemKey(obj any) string {
×
1669
        switch v := obj.(type) {
×
1670
        case string:
×
1671
                return v
×
1672
        case *vpcService:
×
1673
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1674
        case *AdminNetworkPolicyChangedDelta:
×
1675
                return v.key
×
1676
        case *SwitchLBRuleInfo:
×
1677
                return v.Name
×
1678
        default:
×
1679
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1680
                if err != nil {
×
1681
                        utilruntime.HandleError(err)
×
1682
                        return ""
×
1683
                }
×
1684
                return key
×
1685
        }
1686
}
1687

1688
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1689
        return func() {
×
1690
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1691
                }
×
1692
        }
1693
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc