• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27746779844

18 Jun 2026 08:26AM UTC coverage: 26.796% (+0.1%) from 26.652%
27746779844

push

github

web-flow
fix: reconcile vpc bfd ha chassis on node changes (#6891)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

54 of 73 new or added lines in 4 files covered. (73.97%)

2 existing lines in 1 file now uncovered.

15979 of 59632 relevant lines covered (26.8%)

0.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.08
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        appsv1api "k8s.io/api/apps/v1"
17
        corev1 "k8s.io/api/core/v1"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        kubeinformers "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        appsv1 "k8s.io/client-go/listers/apps/v1"
26
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
27
        v1 "k8s.io/client-go/listers/core/v1"
28
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
29
        netv1 "k8s.io/client-go/listers/networking/v1"
30
        "k8s.io/client-go/tools/cache"
31
        "k8s.io/client-go/tools/record"
32
        "k8s.io/client-go/util/workqueue"
33
        "k8s.io/klog/v2"
34
        "k8s.io/utils/keymutex"
35
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
36
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
37
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
38
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
39
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
40

41
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
42
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
43
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
44
        "github.com/kubeovn/kube-ovn/pkg/informer"
45
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
46
        "github.com/kubeovn/kube-ovn/pkg/ovs"
47
        "github.com/kubeovn/kube-ovn/pkg/util"
48
)
49

50
const controllerAgentName = "kube-ovn-controller"
51

52
const (
53
        logicalSwitchKey              = "ls"
54
        logicalRouterKey              = "lr"
55
        portGroupKey                  = "pg"
56
        networkPolicyKey              = "np"
57
        sgKey                         = "sg"
58
        sgsKey                        = "security_groups"
59
        u2oKey                        = "u2o"
60
        adminNetworkPolicyKey         = "anp"
61
        baselineAdminNetworkPolicyKey = "banp"
62
        ippoolKey                     = "ippool"
63
        clusterNetworkPolicyKey       = "cnp"
64
)
65

66
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
67
type Controller struct {
68
        config *Configuration
69

70
        ipam             *ovnipam.IPAM
71
        namedPort        *NamedPort
72
        anpPrioNameMap   map[int32]string
73
        anpNamePrioMap   map[string]int32
74
        bnpPrioNameMap   map[int32]string
75
        bnpNamePrioMap   map[string]int32
76
        priorityMapMutex sync.RWMutex
77

78
        OVNNbClient ovs.NbClient
79
        OVNSbClient ovs.SbClient
80

81
        // ExternalGatewayType define external gateway type, centralized
82
        ExternalGatewayType string
83

84
        podsLister             v1.PodLister
85
        podsSynced             cache.InformerSynced
86
        podIndexer             cache.Indexer
87
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
88
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
89
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
90
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
91
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
92
        podKeyMutex            keymutex.KeyMutex
93

94
        vpcsLister           kubeovnlister.VpcLister
95
        vpcSynced            cache.InformerSynced
96
        vpcIndexer           cache.Indexer
97
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
98
        vpcLastPoliciesMap   *xsync.Map[string, string]
99
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
100
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
101
        vpcKeyMutex          keymutex.KeyMutex
102

103
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
104
        vpcNatGatewaySynced           cache.InformerSynced
105
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
106
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
107
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
108
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
109
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
110
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
111
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
112
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
113
        vpcNatGwKeyMutex              keymutex.KeyMutex
114
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
115

116
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
117
        vpcEgressGatewaySynced           cache.InformerSynced
118
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
119
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
120
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
121

122
        // bgpConfLister/evpnConfLister are published asynchronously by the
123
        // optional-CRD background poller (StartBgpEvpnConfInformerFactory), but
124
        // read by VEG worker goroutines. Atomic pointers keep the read/write
125
        // race-free without locking the hot reconcile path.
126
        bgpConfLister  atomic.Pointer[kubeovnlister.BgpConfLister]
127
        bgpConfSynced  cache.InformerSynced
128
        evpnConfLister atomic.Pointer[kubeovnlister.EvpnConfLister]
129
        evpnConfSynced cache.InformerSynced
130

131
        routerLBRuleLister      kubeovnlister.RouterLBRuleLister
132
        routerLBRuleSynced      cache.InformerSynced
133
        addRouterLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
134
        updateRouterLBRuleQueue workqueue.TypedRateLimitingInterface[*RouterLBRuleInfo]
135
        delRouterLBRuleQueue    workqueue.TypedRateLimitingInterface[*RouterLBRuleInfo]
136

137
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
138
        switchLBRuleSynced      cache.InformerSynced
139
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
140
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
141
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
142

143
        vpcDNSLister           kubeovnlister.VpcDnsLister
144
        vpcDNSSynced           cache.InformerSynced
145
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
146
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
147

148
        subnetsLister           kubeovnlister.SubnetLister
149
        subnetSynced            cache.InformerSynced
150
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
151
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
152
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
153
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
154
        subnetKeyMutex          keymutex.KeyMutex
155

156
        ippoolLister            kubeovnlister.IPPoolLister
157
        ippoolSynced            cache.InformerSynced
158
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
159
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
160
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
161
        ippoolKeyMutex          keymutex.KeyMutex
162

163
        ipsLister     kubeovnlister.IPLister
164
        ipSynced      cache.InformerSynced
165
        ipIndexer     cache.Indexer
166
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
167
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
168
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
169

170
        virtualIpsLister          kubeovnlister.VipLister
171
        virtualIpsSynced          cache.InformerSynced
172
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
173
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
174
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
175
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
176

177
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
178
        iptablesEipSynced      cache.InformerSynced
179
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
180
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
181
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
182
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
183

184
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
185
        iptablesFipSynced      cache.InformerSynced
186
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
187
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
188
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
189

190
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
191
        iptablesDnatRuleSynced      cache.InformerSynced
192
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
193
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
194
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
195

196
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
197
        iptablesSnatRuleSynced      cache.InformerSynced
198
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
199
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
200
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
201

202
        ovnEipsLister     kubeovnlister.OvnEipLister
203
        ovnEipSynced      cache.InformerSynced
204
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
205
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
206
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
207
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
208

209
        ovnFipsLister     kubeovnlister.OvnFipLister
210
        ovnFipSynced      cache.InformerSynced
211
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
212
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
213
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
214

215
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
216
        ovnSnatRuleSynced      cache.InformerSynced
217
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
218
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
219
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
220

221
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
222
        ovnDnatRuleSynced      cache.InformerSynced
223
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
224
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
225
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
226

227
        providerNetworksLister kubeovnlister.ProviderNetworkLister
228
        providerNetworkSynced  cache.InformerSynced
229

230
        vlansLister     kubeovnlister.VlanLister
231
        vlanSynced      cache.InformerSynced
232
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
233
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
234
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
235
        vlanKeyMutex    keymutex.KeyMutex
236

237
        namespacesLister  v1.NamespaceLister
238
        namespacesSynced  cache.InformerSynced
239
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
240
        nsKeyMutex        keymutex.KeyMutex
241

242
        nodesLister     v1.NodeLister
243
        nodesSynced     cache.InformerSynced
244
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
245
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
246
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
247
        nodeKeyMutex    keymutex.KeyMutex
248

249
        servicesLister     v1.ServiceLister
250
        serviceSynced      cache.InformerSynced
251
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
252
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
253
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
254
        svcKeyMutex        keymutex.KeyMutex
255

256
        endpointSlicesLister          discoveryv1.EndpointSliceLister
257
        endpointSlicesSynced          cache.InformerSynced
258
        epsIndexer                    cache.Indexer
259
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
260
        epKeyMutex                    keymutex.KeyMutex
261

262
        deploymentsLister appsv1.DeploymentLister
263
        deploymentsSynced cache.InformerSynced
264

265
        npsLister     netv1.NetworkPolicyLister
266
        npsSynced     cache.InformerSynced
267
        npIndexer     cache.Indexer
268
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
269
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
270
        npKeyMutex    keymutex.KeyMutex
271

272
        sgsLister          kubeovnlister.SecurityGroupLister
273
        sgSynced           cache.InformerSynced
274
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
275
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
276
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
277
        sgKeyMutex         keymutex.KeyMutex
278

279
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
280
        qosPolicySynced      cache.InformerSynced
281
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
282
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
283
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
284

285
        configMapsLister v1.ConfigMapLister
286
        configMapsSynced cache.InformerSynced
287

288
        anpsLister     anplister.AdminNetworkPolicyLister
289
        anpsSynced     cache.InformerSynced
290
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
291
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
292
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
293
        anpKeyMutex    keymutex.KeyMutex
294

295
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
296
        dnsNameResolverIndexer          cache.Indexer
297
        dnsNameResolversSynced          cache.InformerSynced
298
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
299
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
300

301
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
302
        banpsSynced     cache.InformerSynced
303
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
304
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
305
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
306
        banpKeyMutex    keymutex.KeyMutex
307

308
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
309
        cnpsSynced     cache.InformerSynced
310
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
311
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
312
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
313
        cnpKeyMutex    keymutex.KeyMutex
314

315
        csrLister           certListerv1.CertificateSigningRequestLister
316
        csrSynced           cache.InformerSynced
317
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
318

319
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
320
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
321
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
322

323
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
324
        netAttachSynced          cache.InformerSynced
325
        netAttachInformerFactory netAttach.SharedInformerFactory
326

327
        serviceCIDRStore           *util.ServiceCIDRStore
328
        serviceCIDRLister          netv1.ServiceCIDRLister
329
        serviceCIDRSynced          cache.InformerSynced
330
        serviceCIDRInformerFactory kubeinformers.SharedInformerFactory
331

332
        recorder               record.EventRecorder
333
        informerFactory        kubeinformers.SharedInformerFactory
334
        cmInformerFactory      kubeinformers.SharedInformerFactory
335
        deployInformerFactory  kubeinformers.SharedInformerFactory
336
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
337
        anpInformerFactory     anpinformer.SharedInformerFactory
338

339
        // Database health check
340
        dbFailureCount int
341

342
        distributedSubnetNeedSync atomic.Bool
343
}
344

345
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
346
        if rateLimiter == nil {
2✔
347
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
348
        }
1✔
349
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
350
}
351

352
// Run creates and runs a new ovn controller
353
func Run(ctx context.Context, config *Configuration) {
×
354
        klog.V(4).Info("Creating event broadcaster")
×
355
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
356
        eventBroadcaster.StartLogging(klog.Infof)
×
357
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
358
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
359
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
360
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
361
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
362
        )
×
363

×
364
        var err error
×
365
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
366
                kubeinformers.WithTransform(util.TrimPodForController),
×
367
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
368
                        listOption.AllowWatchBookmarks = true
×
369
                }))
×
370
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
371
                kubeinformers.WithNamespace(config.PodNamespace),
×
372
                kubeinformers.WithTransform(util.TrimManagedFields),
×
373
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
374
                        listOption.AllowWatchBookmarks = true
×
375
                }))
×
376
        // deployment informer used to list/watch vpc egress gateway and nat gateway workloads
377
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
378
                kubeinformers.WithTransform(util.TrimManagedFields),
×
379
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
380
                        listOption.AllowWatchBookmarks = true
×
381
                }))
×
382
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
383
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
384
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
385
                        listOption.AllowWatchBookmarks = true
×
386
                }))
×
387
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
388
                anpinformer.WithTransform(util.TrimManagedFields),
×
389
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
390
                        listOption.AllowWatchBookmarks = true
×
391
                }))
×
392
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
393
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
394
                        listOption.AllowWatchBookmarks = true
×
395
                }),
×
396
        )
397
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
398
                informer.WithTransform(util.TrimManagedFields),
×
399
        )
×
400
        // Dedicated factory so that on clusters without the ServiceCIDR API the
×
401
        // failed list/watch does not contaminate the main informer factory.
×
402
        serviceCIDRInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
403
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
404
                        listOption.AllowWatchBookmarks = true
×
405
                }),
×
406
        )
407

408
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
409
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
410
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
411
        // BgpConf/EvpnConf informers are started lazily via StartBgpEvpnConfInformerFactory
×
412
        // because their CRDs are optional on clusters that don't use vpc-egress-gateway BGP/EVPN.
×
413
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
414
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
415
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
416
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
417
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
418
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
419
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
420
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
421
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
422
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
423
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
424
        podInformer := informerFactory.Core().V1().Pods()
×
425
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
426
        nodeInformer := informerFactory.Core().V1().Nodes()
×
427
        serviceInformer := informerFactory.Core().V1().Services()
×
428
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
429
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
430
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
431
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
432
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
433
        routerLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().RouterLBRules()
×
434
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
435
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
436
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
437
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
438
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
439
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
440
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
441
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
442
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
443
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
444
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
445
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
446

×
447
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
448
        controller := &Controller{
×
449
                config:             config,
×
450
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
451
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
452
                ipam:               ovnipam.NewIPAM(),
×
453
                namedPort:          NewNamedPort(),
×
454

×
455
                vpcsLister:           vpcInformer.Lister(),
×
456
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
457
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
458
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
459
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
460
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
461
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
462

×
463
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
464
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
465
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
466
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
467
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
468
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
469
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
470
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
471
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
472
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
473
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
474
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
475
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
476
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
477
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
478
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
479
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
480

×
481
                // bgpConfLister/bgpConfSynced/evpnConfLister/evpnConfSynced are populated lazily
×
482
                // in startBgpEvpnConfInformer once the matching CRDs are detected.
×
483

×
484
                subnetsLister:           subnetInformer.Lister(),
×
485
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
486
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
487
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
488
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
489
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
490
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
491

×
492
                ippoolLister:            ippoolInformer.Lister(),
×
493
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
494
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
495
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
496
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
497
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
498

×
499
                ipsLister:     ipInformer.Lister(),
×
500
                ipSynced:      ipInformer.Informer().HasSynced,
×
501
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
502
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
503
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
504

×
505
                virtualIpsLister:          virtualIPInformer.Lister(),
×
506
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
507
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
508
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
509
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
510
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
511

×
512
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
513
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
514
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
515
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
516
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
517
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
518

×
519
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
520
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
521
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
522
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
523
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
524

×
525
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
526
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
527
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
528
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
529
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
530

×
531
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
532
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
533
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
534
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
535
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
536

×
537
                vlansLister:     vlanInformer.Lister(),
×
538
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
539
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
540
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
541
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
542
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
543

×
544
                providerNetworksLister: providerNetworkInformer.Lister(),
×
545
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
546

×
547
                podsLister:          podInformer.Lister(),
×
548
                podsSynced:          podInformer.Informer().HasSynced,
×
549
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
550
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
551
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
552
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
553
                                Name:          "DeletePod",
×
554
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
555
                        },
×
556
                ),
×
557
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
558
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
559

×
560
                namespacesLister:  namespaceInformer.Lister(),
×
561
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
562
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
563
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
564

×
565
                nodesLister:     nodeInformer.Lister(),
×
566
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
567
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
568
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
569
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
570
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
571

×
572
                servicesLister:     serviceInformer.Lister(),
×
573
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
574
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
575
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
576
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
577
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
578

×
579
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
580
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
581
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
582
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
583

×
584
                deploymentsLister: deploymentInformer.Lister(),
×
585
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
586

×
587
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
588
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
589
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
590
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
591
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
592

×
593
                configMapsLister: configMapInformer.Lister(),
×
594
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
595

×
596
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
597
                sgsLister:          sgInformer.Lister(),
×
598
                sgSynced:           sgInformer.Informer().HasSynced,
×
599
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
600
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
601
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
602

×
603
                ovnEipsLister:     ovnEipInformer.Lister(),
×
604
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
605
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
606
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
607
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
608
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
609

×
610
                ovnFipsLister:     ovnFipInformer.Lister(),
×
611
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
612
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
613
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
614
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
615

×
616
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
617
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
618
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
619
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
620
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
621

×
622
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
623
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
624
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
625
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
626
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
627

×
628
                csrLister:           csrInformer.Lister(),
×
629
                csrSynced:           csrInformer.Informer().HasSynced,
×
630
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
631

×
632
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
633
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
634
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
635

×
636
                netAttachLister:          netAttachInformer.Lister(),
×
637
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
638
                netAttachInformerFactory: attachNetInformerFactory,
×
639

×
640
                serviceCIDRStore:           util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
641
                serviceCIDRInformerFactory: serviceCIDRInformerFactory,
×
642

×
643
                recorder:               recorder,
×
644
                informerFactory:        informerFactory,
×
645
                cmInformerFactory:      cmInformerFactory,
×
646
                deployInformerFactory:  deployInformerFactory,
×
647
                kubeovnInformerFactory: kubeovnInformerFactory,
×
648
                anpInformerFactory:     anpInformerFactory,
×
649
        }
×
650

×
651
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
652
                config.OvnNbAddr,
×
653
                config.OvnTimeout,
×
654
                config.OvsDbConnectTimeout,
×
655
                config.OvsDbInactivityTimeout,
×
656
                config.OvsDbConnectMaxRetry,
×
657
        ); err != nil {
×
658
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
659
        }
×
660
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
661
                config.OvnSbAddr,
×
662
                config.OvnTimeout,
×
663
                config.OvsDbConnectTimeout,
×
664
                config.OvsDbInactivityTimeout,
×
665
                config.OvsDbConnectMaxRetry,
×
666
        ); err != nil {
×
667
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
668
        }
×
669
        if config.EnableLb {
×
670
                controller.routerLBRuleLister = routerLBRuleInformer.Lister()
×
671
                controller.routerLBRuleSynced = routerLBRuleInformer.Informer().HasSynced
×
672
                controller.addRouterLBRuleQueue = newTypedRateLimitingQueue("AddRouterLBRule", custCrdRateLimiter)
×
673
                controller.delRouterLBRuleQueue = newTypedRateLimitingQueue(
×
674
                        "DeleteRouterLBRule",
×
675
                        workqueue.NewTypedMaxOfRateLimiter(
×
676
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*RouterLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
677
                                &workqueue.TypedBucketRateLimiter[*RouterLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
678
                        ),
×
679
                )
×
680
                controller.updateRouterLBRuleQueue = newTypedRateLimitingQueue(
×
681
                        "UpdateRouterLBRule",
×
682
                        workqueue.NewTypedMaxOfRateLimiter(
×
683
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*RouterLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
684
                                &workqueue.TypedBucketRateLimiter[*RouterLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
685
                        ),
×
686
                )
×
687

×
688
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
689
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
690
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
691
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
692
                        "DeleteSwitchLBRule",
×
693
                        workqueue.NewTypedMaxOfRateLimiter(
×
694
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
695
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
696
                        ),
×
697
                )
×
698
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
699
                        "UpdateSwitchLBRule",
×
700
                        workqueue.NewTypedMaxOfRateLimiter(
×
701
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
702
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
703
                        ),
×
704
                )
×
705

×
706
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
707
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
708
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
709
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
710
        }
×
711

712
        if config.EnableNP {
×
713
                controller.npsLister = npInformer.Lister()
×
714
                controller.npsSynced = npInformer.Informer().HasSynced
×
715
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
716
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
717
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
718
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
719
        }
×
720

721
        if config.EnableANP {
×
722
                controller.anpsLister = anpInformer.Lister()
×
723
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
724
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
725
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
726
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
727
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
728

×
729
                controller.banpsLister = banpInformer.Lister()
×
730
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
731
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
732
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
733
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
734
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
735

×
736
                controller.cnpsLister = cnpInformer.Lister()
×
737
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
738
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
739
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
740
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
741
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
742
        }
×
743

744
        if config.EnableDNSNameResolver {
×
745
                if !config.EnableANP {
×
746
                        klog.Warning("DNS name resolver is enabled but ANP support is disabled, DNSNameResolver resources will not take effect")
×
747
                }
×
748
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
749
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
750
                if err := dnsNameResolverInformer.Informer().AddIndexers(cache.Indexers{
×
751
                        IndexDNSNameResolverByName: indexDNSNameResolverByName,
×
752
                }); err != nil {
×
753
                        util.LogFatalAndExit(err, "failed to add DNSNameResolver indexer")
×
754
                }
×
755
                controller.dnsNameResolverIndexer = dnsNameResolverInformer.Informer().GetIndexer()
×
756
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
757
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
758
        }
759

NEW
760
        if err := controller.setupIndexers(vpcInformer.Informer(), podInformer.Informer(), endpointSliceInformer.Informer(), ipInformer.Informer()); err != nil {
×
761
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
762
        }
×
763

764
        defer controller.shutdown()
×
765
        klog.Info("Starting OVN controller")
×
766

×
767
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
768
        // NAD CRD is optional, so we check if it exists before starting the informer
×
769
        controller.StartNetAttachInformerFactory(ctx)
×
770

×
771
        // ServiceCIDR (networking.k8s.io/v1) is GA in K8s 1.33; older clusters
×
772
        // don't have the API at all. Best-effort start with periodic retry.
×
773
        controller.StartServiceCIDRInformerFactory(ctx)
×
774

×
775
        // BgpConf/EvpnConf are optional CRDs (v1.16.0+, used by vpc-egress-gateway BGP/EVPN).
×
776
        // They may be missing on clusters upgraded from <v1.16 via Helm, which does not
×
777
        // re-apply the `crds/` directory on `helm upgrade`. Best-effort start with periodic retry.
×
778
        controller.StartBgpEvpnConfInformerFactory(ctx)
×
779

×
780
        // Wait for the caches to be synced before starting workers
×
781
        controller.informerFactory.Start(ctx.Done())
×
782
        controller.cmInformerFactory.Start(ctx.Done())
×
783
        controller.deployInformerFactory.Start(ctx.Done())
×
784
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
785
        controller.anpInformerFactory.Start(ctx.Done())
×
786
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
787

×
788
        klog.Info("Waiting for informer caches to sync")
×
789
        cacheSyncs := []cache.InformerSynced{
×
790
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
791
                controller.vpcSynced, controller.subnetSynced,
×
792
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
793
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
794
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
795
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
796
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
797
                controller.ovnDnatRuleSynced,
×
798
        }
×
799
        if controller.config.EnableLb {
×
800
                cacheSyncs = append(cacheSyncs, controller.routerLBRuleSynced, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
801
        }
×
802
        if controller.config.EnableNP {
×
803
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
804
        }
×
805
        if controller.config.EnableANP {
×
806
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
807
        }
×
808
        if controller.config.EnableDNSNameResolver {
×
809
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
810
        }
×
811

812
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
813
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
814
        }
×
815

816
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
817
                AddFunc:    controller.enqueueAddPod,
×
818
                DeleteFunc: controller.enqueueDeletePod,
×
819
                UpdateFunc: controller.enqueueUpdatePod,
×
820
        }); err != nil {
×
821
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
822
        }
×
823

824
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
825
                AddFunc:    controller.enqueueAddNamespace,
×
826
                UpdateFunc: controller.enqueueUpdateNamespace,
×
827
                DeleteFunc: controller.enqueueDeleteNamespace,
×
828
        }); err != nil {
×
829
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
830
        }
×
831

832
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
833
                AddFunc:    controller.enqueueAddNode,
×
834
                UpdateFunc: controller.enqueueUpdateNode,
×
835
                DeleteFunc: controller.enqueueDeleteNode,
×
836
        }); err != nil {
×
837
                util.LogFatalAndExit(err, "failed to add node event handler")
×
838
        }
×
839

840
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
841
                AddFunc:    controller.enqueueAddService,
×
842
                DeleteFunc: controller.enqueueDeleteService,
×
843
                UpdateFunc: controller.enqueueUpdateService,
×
844
        }); err != nil {
×
845
                util.LogFatalAndExit(err, "failed to add service event handler")
×
846
        }
×
847

848
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
849
                AddFunc:    controller.enqueueAddEndpointSlice,
×
850
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
851
        }); err != nil {
×
852
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
853
        }
×
854

855
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
×
856
                FilterFunc: func(obj any) bool {
×
857
                        if deploy, ok := obj.(*appsv1api.Deployment); ok {
×
858
                                // Only watch deployments with VpcEgressGatewayLabel or VpcNatGatewayLabel
×
859
                                _, hasNatGwLabel := deploy.Labels[util.VpcNatGatewayLabel]
×
860
                                _, hasEgressGwLabel := deploy.Labels[util.VpcEgressGatewayLabel]
×
861
                                return hasNatGwLabel || hasEgressGwLabel
×
862
                        }
×
863
                        return false
×
864
                },
865
                Handler: cache.ResourceEventHandlerFuncs{
866
                        AddFunc:    controller.enqueueAddDeployment,
867
                        UpdateFunc: controller.enqueueUpdateDeployment,
868
                },
869
        }); err != nil {
×
870
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
871
        }
×
872

873
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
874
                AddFunc:    controller.enqueueAddVpc,
×
875
                UpdateFunc: controller.enqueueUpdateVpc,
×
876
                DeleteFunc: controller.enqueueDelVpc,
×
877
        }); err != nil {
×
878
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
879
        }
×
880

881
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
882
                AddFunc:    controller.enqueueAddVpcNatGw,
×
883
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
884
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
885
        }); err != nil {
×
886
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
887
        }
×
888

889
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
890
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
891
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
892
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
893
        }); err != nil {
×
894
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
895
        }
×
896

897
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
898
                AddFunc:    controller.enqueueAddSubnet,
×
899
                UpdateFunc: controller.enqueueUpdateSubnet,
×
900
                DeleteFunc: controller.enqueueDeleteSubnet,
×
901
        }); err != nil {
×
902
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
903
        }
×
904

905
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
906
                AddFunc:    controller.enqueueAddIPPool,
×
907
                UpdateFunc: controller.enqueueUpdateIPPool,
×
908
                DeleteFunc: controller.enqueueDeleteIPPool,
×
909
        }); err != nil {
×
910
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
911
        }
×
912

913
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
914
                AddFunc:    controller.enqueueAddIP,
×
915
                UpdateFunc: controller.enqueueUpdateIP,
×
916
                DeleteFunc: controller.enqueueDelIP,
×
917
        }); err != nil {
×
918
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
919
        }
×
920

921
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
922
                AddFunc:    controller.enqueueAddVlan,
×
923
                DeleteFunc: controller.enqueueDelVlan,
×
924
                UpdateFunc: controller.enqueueUpdateVlan,
×
925
        }); err != nil {
×
926
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
927
        }
×
928

929
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
930
                AddFunc:    controller.enqueueAddSg,
×
931
                DeleteFunc: controller.enqueueDeleteSg,
×
932
                UpdateFunc: controller.enqueueUpdateSg,
×
933
        }); err != nil {
×
934
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
935
        }
×
936

937
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
938
                AddFunc:    controller.enqueueAddVirtualIP,
×
939
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
940
                DeleteFunc: controller.enqueueDelVirtualIP,
×
941
        }); err != nil {
×
942
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
943
        }
×
944

945
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
946
                AddFunc:    controller.enqueueAddIptablesEip,
×
947
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
948
                DeleteFunc: controller.enqueueDelIptablesEip,
×
949
        }); err != nil {
×
950
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
951
        }
×
952

953
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
954
                AddFunc:    controller.enqueueAddIptablesFip,
×
955
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
956
                DeleteFunc: controller.enqueueDelIptablesFip,
×
957
        }); err != nil {
×
958
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
959
        }
×
960

961
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
962
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
963
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
964
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
965
        }); err != nil {
×
966
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
967
        }
×
968

969
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
970
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
971
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
972
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
973
        }); err != nil {
×
974
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
975
        }
×
976

977
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
978
                AddFunc:    controller.enqueueAddOvnEip,
×
979
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
980
                DeleteFunc: controller.enqueueDelOvnEip,
×
981
        }); err != nil {
×
982
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
983
        }
×
984

985
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
986
                AddFunc:    controller.enqueueAddOvnFip,
×
987
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
988
                DeleteFunc: controller.enqueueDelOvnFip,
×
989
        }); err != nil {
×
990
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
991
        }
×
992

993
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
994
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
995
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
996
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
997
        }); err != nil {
×
998
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
999
        }
×
1000

1001
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1002
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
1003
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
1004
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
1005
        }); err != nil {
×
1006
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
1007
        }
×
1008

1009
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1010
                AddFunc:    controller.enqueueAddQoSPolicy,
×
1011
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
1012
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
1013
        }); err != nil {
×
1014
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
1015
        }
×
1016

1017
        if config.EnableLb {
×
1018
                if _, err = routerLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1019
                        AddFunc:    controller.enqueueAddRouterLBRule,
×
1020
                        UpdateFunc: controller.enqueueUpdateRouterLBRule,
×
1021
                        DeleteFunc: controller.enqueueDeleteRouterLBRule,
×
1022
                }); err != nil {
×
1023
                        util.LogFatalAndExit(err, "failed to add router lb rule event handler")
×
1024
                }
×
1025

1026
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1027
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
1028
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
1029
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
1030
                }); err != nil {
×
1031
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
1032
                }
×
1033

1034
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1035
                        AddFunc:    controller.enqueueAddVpcDNS,
×
1036
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
1037
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
1038
                }); err != nil {
×
1039
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
1040
                }
×
1041
        }
1042

1043
        if config.EnableNP {
×
1044
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1045
                        AddFunc:    controller.enqueueAddNp,
×
1046
                        UpdateFunc: controller.enqueueUpdateNp,
×
1047
                        DeleteFunc: controller.enqueueDeleteNp,
×
1048
                }); err != nil {
×
1049
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
1050
                }
×
1051
        }
1052

1053
        if config.EnableANP {
×
1054
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1055
                        AddFunc:    controller.enqueueAddAnp,
×
1056
                        UpdateFunc: controller.enqueueUpdateAnp,
×
1057
                        DeleteFunc: controller.enqueueDeleteAnp,
×
1058
                }); err != nil {
×
1059
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
1060
                }
×
1061

1062
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1063
                        AddFunc:    controller.enqueueAddBanp,
×
1064
                        UpdateFunc: controller.enqueueUpdateBanp,
×
1065
                        DeleteFunc: controller.enqueueDeleteBanp,
×
1066
                }); err != nil {
×
1067
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
1068
                }
×
1069

1070
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1071
                        AddFunc:    controller.enqueueAddCnp,
×
1072
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1073
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1074
                }); err != nil {
×
1075
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1076
                }
×
1077

1078
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1079
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1080
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1081
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1082
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1083
        }
1084

1085
        if config.EnableDNSNameResolver {
×
1086
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1087
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1088
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1089
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1090
                }); err != nil {
×
1091
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1092
                }
×
1093
        }
1094

1095
        if config.EnableOVNIPSec {
×
1096
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1097
                        AddFunc:    controller.enqueueAddCsr,
×
1098
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1099
                        // no need to add delete func for csr
×
1100
                }); err != nil {
×
1101
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1102
                }
×
1103
        }
1104

1105
        controller.Run(ctx)
×
1106
}
1107

1108
// Run will set up the event handlers for types we are interested in, as well
1109
// as syncing informer caches and starting workers. It will block until stopCh
1110
// is closed, at which point it will shutdown the workqueue and wait for
1111
// workers to finish processing their current work items.
1112
func (c *Controller) Run(ctx context.Context) {
×
1113
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1114
        // Otherwise, the init process should be placed after all workers have already started working
×
1115
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1116
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1117
        }
×
1118

1119
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1120
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1121
        }
×
1122

1123
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1124
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1125
        }
×
1126

1127
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1128
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1129
        }
×
1130

1131
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1132
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1133
        }
×
1134

1135
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1136
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1137
        }
×
1138

1139
        if err := c.InitOVN(); err != nil {
×
1140
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1141
        }
×
1142

1143
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1144
        if err := c.syncIPCR(); err != nil {
×
1145
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1146
        }
×
1147

1148
        if err := c.syncFinalizers(); err != nil {
×
1149
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1150
        }
×
1151

1152
        if err := c.InitIPAM(); err != nil {
×
1153
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1154
        }
×
1155

1156
        if err := c.syncNodeRoutes(); err != nil {
×
1157
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1158
        }
×
1159

1160
        if err := c.syncSubnetCR(); err != nil {
×
1161
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1162
        }
×
1163

1164
        if err := c.syncVlanCR(); err != nil {
×
1165
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1166
        }
×
1167

1168
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1169
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1170
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1171
                }
×
1172
        }
1173

1174
        // start workers to do all the network operations
1175
        c.startWorkers(ctx)
×
1176

×
1177
        c.initResourceOnce()
×
1178
        <-ctx.Done()
×
1179
        klog.Info("Shutting down workers")
×
1180

×
1181
        c.OVNNbClient.Close()
×
1182
        c.OVNSbClient.Close()
×
1183
}
1184

1185
func (c *Controller) dbStatus() {
×
1186
        const maxFailures = 5
×
1187

×
1188
        done := make(chan error, 2)
×
1189
        go func() {
×
1190
                done <- c.OVNNbClient.Echo(context.Background())
×
1191
        }()
×
1192
        go func() {
×
1193
                done <- c.OVNSbClient.Echo(context.Background())
×
1194
        }()
×
1195

1196
        resultsReceived := 0
×
1197
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1198

×
1199
        for resultsReceived < 2 {
×
1200
                select {
×
1201
                case err := <-done:
×
1202
                        resultsReceived++
×
1203
                        if err != nil {
×
1204
                                c.dbFailureCount++
×
1205
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1206
                                if c.dbFailureCount >= maxFailures {
×
1207
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1208
                                }
×
1209
                                return
×
1210
                        }
1211
                case <-timeout:
×
1212
                        c.dbFailureCount++
×
1213
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1214
                        if c.dbFailureCount >= maxFailures {
×
1215
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1216
                        }
×
1217
                        return
×
1218
                }
1219
        }
1220

1221
        if c.dbFailureCount > 0 {
×
1222
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1223
                c.dbFailureCount = 0
×
1224
        }
×
1225
}
1226

1227
func (c *Controller) shutdown() {
×
1228
        utilruntime.HandleCrash()
×
1229

×
1230
        c.addOrUpdatePodQueue.ShutDown()
×
1231
        c.deletePodQueue.ShutDown()
×
1232
        c.updatePodSecurityQueue.ShutDown()
×
1233

×
1234
        c.addNamespaceQueue.ShutDown()
×
1235

×
1236
        c.addOrUpdateSubnetQueue.ShutDown()
×
1237
        c.deleteSubnetQueue.ShutDown()
×
1238
        c.updateSubnetStatusQueue.ShutDown()
×
1239
        c.syncVirtualPortsQueue.ShutDown()
×
1240

×
1241
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1242
        c.updateIPPoolStatusQueue.ShutDown()
×
1243
        c.deleteIPPoolQueue.ShutDown()
×
1244

×
1245
        c.addNodeQueue.ShutDown()
×
1246
        c.updateNodeQueue.ShutDown()
×
1247
        c.deleteNodeQueue.ShutDown()
×
1248

×
1249
        c.addServiceQueue.ShutDown()
×
1250
        c.deleteServiceQueue.ShutDown()
×
1251
        c.updateServiceQueue.ShutDown()
×
1252
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1253

×
1254
        c.addVlanQueue.ShutDown()
×
1255
        c.delVlanQueue.ShutDown()
×
1256
        c.updateVlanQueue.ShutDown()
×
1257

×
1258
        c.addOrUpdateVpcQueue.ShutDown()
×
1259
        c.updateVpcStatusQueue.ShutDown()
×
1260
        c.delVpcQueue.ShutDown()
×
1261

×
1262
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1263
        c.initVpcNatGatewayQueue.ShutDown()
×
1264
        c.delVpcNatGatewayQueue.ShutDown()
×
1265
        c.updateVpcEipQueue.ShutDown()
×
1266
        c.updateVpcFloatingIPQueue.ShutDown()
×
1267
        c.updateVpcDnatQueue.ShutDown()
×
1268
        c.updateVpcSnatQueue.ShutDown()
×
1269
        c.updateVpcSubnetQueue.ShutDown()
×
1270

×
1271
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1272
        c.delVpcEgressGatewayQueue.ShutDown()
×
1273

×
1274
        if c.config.EnableLb {
×
1275
                c.addRouterLBRuleQueue.ShutDown()
×
1276
                c.delRouterLBRuleQueue.ShutDown()
×
1277
                c.updateRouterLBRuleQueue.ShutDown()
×
1278

×
1279
                c.addSwitchLBRuleQueue.ShutDown()
×
1280
                c.delSwitchLBRuleQueue.ShutDown()
×
1281
                c.updateSwitchLBRuleQueue.ShutDown()
×
1282

×
1283
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1284
                c.delVpcDNSQueue.ShutDown()
×
1285
        }
×
1286

1287
        c.addIPQueue.ShutDown()
×
1288
        c.updateIPQueue.ShutDown()
×
1289
        c.delIPQueue.ShutDown()
×
1290

×
1291
        c.addVirtualIPQueue.ShutDown()
×
1292
        c.updateVirtualIPQueue.ShutDown()
×
1293
        c.updateVirtualParentsQueue.ShutDown()
×
1294
        c.delVirtualIPQueue.ShutDown()
×
1295

×
1296
        c.addIptablesEipQueue.ShutDown()
×
1297
        c.updateIptablesEipQueue.ShutDown()
×
1298
        c.resetIptablesEipQueue.ShutDown()
×
1299
        c.delIptablesEipQueue.ShutDown()
×
1300

×
1301
        c.addIptablesFipQueue.ShutDown()
×
1302
        c.updateIptablesFipQueue.ShutDown()
×
1303
        c.delIptablesFipQueue.ShutDown()
×
1304

×
1305
        c.addIptablesDnatRuleQueue.ShutDown()
×
1306
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1307
        c.delIptablesDnatRuleQueue.ShutDown()
×
1308

×
1309
        c.addIptablesSnatRuleQueue.ShutDown()
×
1310
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1311
        c.delIptablesSnatRuleQueue.ShutDown()
×
1312

×
1313
        c.addQoSPolicyQueue.ShutDown()
×
1314
        c.updateQoSPolicyQueue.ShutDown()
×
1315
        c.delQoSPolicyQueue.ShutDown()
×
1316

×
1317
        c.addOvnEipQueue.ShutDown()
×
1318
        c.updateOvnEipQueue.ShutDown()
×
1319
        c.resetOvnEipQueue.ShutDown()
×
1320
        c.delOvnEipQueue.ShutDown()
×
1321

×
1322
        c.addOvnFipQueue.ShutDown()
×
1323
        c.updateOvnFipQueue.ShutDown()
×
1324
        c.delOvnFipQueue.ShutDown()
×
1325

×
1326
        c.addOvnSnatRuleQueue.ShutDown()
×
1327
        c.updateOvnSnatRuleQueue.ShutDown()
×
1328
        c.delOvnSnatRuleQueue.ShutDown()
×
1329

×
1330
        c.addOvnDnatRuleQueue.ShutDown()
×
1331
        c.updateOvnDnatRuleQueue.ShutDown()
×
1332
        c.delOvnDnatRuleQueue.ShutDown()
×
1333

×
1334
        if c.config.EnableNP {
×
1335
                c.updateNpQueue.ShutDown()
×
1336
                c.deleteNpQueue.ShutDown()
×
1337
        }
×
1338
        if c.config.EnableANP {
×
1339
                c.addAnpQueue.ShutDown()
×
1340
                c.updateAnpQueue.ShutDown()
×
1341
                c.deleteAnpQueue.ShutDown()
×
1342

×
1343
                c.addBanpQueue.ShutDown()
×
1344
                c.updateBanpQueue.ShutDown()
×
1345
                c.deleteBanpQueue.ShutDown()
×
1346

×
1347
                c.addCnpQueue.ShutDown()
×
1348
                c.updateCnpQueue.ShutDown()
×
1349
                c.deleteCnpQueue.ShutDown()
×
1350
        }
×
1351

1352
        if c.config.EnableDNSNameResolver {
×
1353
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1354
                c.deleteDNSNameResolverQueue.ShutDown()
×
1355
        }
×
1356

1357
        c.addOrUpdateSgQueue.ShutDown()
×
1358
        c.delSgQueue.ShutDown()
×
1359
        c.syncSgPortsQueue.ShutDown()
×
1360

×
1361
        c.addOrUpdateCsrQueue.ShutDown()
×
1362

×
1363
        if c.config.EnableLiveMigrationOptimize {
×
1364
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1365
        }
×
1366
}
1367

1368
func (c *Controller) startWorkers(ctx context.Context) {
×
1369
        klog.Info("Starting workers")
×
1370

×
1371
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1372
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1373
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1374

×
1375
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1376
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1377
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1378
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1379
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1380
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1381
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1382
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1383
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1384
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1385
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1386
        // add default and join subnet and wait them ready
×
1387
        for range c.config.WorkerNum {
×
1388
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1389
        }
×
1390
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1391
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1392
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1393
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1394
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1395
                klog.Infof("wait for subnets %v ready", subnets)
×
1396

×
1397
                return c.allSubnetReady(subnets...)
×
1398
        })
×
1399
        if err != nil {
×
1400
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1401
        }
×
1402

1403
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1404
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1405
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1406

×
1407
        // run node worker before handle any pods
×
1408
        for range c.config.WorkerNum {
×
1409
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1410
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1411
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1412
        }
×
1413
        for {
×
1414
                ready := true
×
1415
                time.Sleep(3 * time.Second)
×
1416
                nodes, err := c.nodesLister.List(labels.Everything())
×
1417
                if err != nil {
×
1418
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1419
                }
×
1420
                for _, node := range nodes {
×
1421
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1422
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1423
                                ready = false
×
1424
                                break
×
1425
                        }
1426
                }
1427
                if ready {
×
1428
                        break
×
1429
                }
1430
        }
1431

1432
        if c.config.EnableLb {
×
1433
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1434
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1435
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1436

×
1437
                go wait.Until(runWorker("add/update router lb rule", c.addRouterLBRuleQueue, c.handleAddOrUpdateRouterLBRule), time.Second, ctx.Done())
×
1438
                go wait.Until(runWorker("delete router lb rule", c.delRouterLBRuleQueue, c.handleDelRouterLBRule), time.Second, ctx.Done())
×
1439
                go wait.Until(runWorker("update router lb rule", c.updateRouterLBRuleQueue, c.handleUpdateRouterLBRule), time.Second, ctx.Done())
×
1440

×
1441
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1442
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1443
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1444

×
1445
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1446
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1447
                go wait.Until(func() {
×
1448
                        c.resyncVpcDNSConfig()
×
1449
                }, 5*time.Second, ctx.Done())
×
1450
        }
1451

1452
        for range c.config.WorkerNum {
×
1453
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1454
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1455
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1456

×
1457
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1458
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1459
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1460
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1461
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1462

×
1463
                if c.config.EnableLb {
×
1464
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1465
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1466
                }
×
1467

1468
                if c.config.EnableNP {
×
1469
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1470
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1471
                }
×
1472

1473
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1474
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1475
        }
1476

1477
        if c.config.EnableEipSnat {
×
1478
                go wait.Until(func() {
×
1479
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1480
                        c.resyncExternalGateway()
×
1481
                }, time.Second, ctx.Done())
×
1482

1483
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1484
                c.OVNNbClient.MonitorBFD()
×
1485
        }
1486
        // TODO: we should merge these two vpc nat config into one config and resync them together
1487
        go wait.Until(func() {
×
1488
                c.resyncVpcNatGwConfig()
×
1489
        }, time.Second, ctx.Done())
×
1490

1491
        go wait.Until(func() {
×
1492
                c.resyncVpcNatConfig()
×
1493
        }, time.Second, ctx.Done())
×
1494

1495
        if c.config.GCInterval != 0 {
×
1496
                go wait.Until(func() {
×
1497
                        if err := c.markAndCleanLSP(); err != nil {
×
1498
                                klog.Errorf("gc lsp error: %v", err)
×
1499
                        }
×
1500
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1501
        }
1502

1503
        go wait.Until(func() {
×
1504
                if err := c.inspectPod(); err != nil {
×
1505
                        klog.Errorf("inspection error: %v", err)
×
1506
                }
×
1507
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1508

1509
        if c.config.EnableExternalVpc {
×
1510
                go wait.Until(func() {
×
1511
                        c.syncExternalVpc()
×
1512
                }, 5*time.Second, ctx.Done())
×
1513
        }
1514

1515
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1516
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1517
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1518
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1519

×
1520
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1521
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1522
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1523
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1524

×
1525
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1526
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1527
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1528

×
1529
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1530
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1531
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1532

×
1533
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1534
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1535
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1536

×
1537
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1538

×
1539
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1540
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1541
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1542

×
1543
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1544
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1545
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1546
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1547

×
1548
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1549
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1550
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1551
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1552

×
1553
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1554
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1555
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1556

×
1557
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1558
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1559
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1560

×
1561
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1562
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1563
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1564

×
1565
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1566
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1567
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1568

×
1569
        if c.config.EnableANP {
×
1570
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1571
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1572
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1573

×
1574
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1575
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1576
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1577

×
1578
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1579
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1580
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1581
        }
×
1582

1583
        if c.config.EnableDNSNameResolver {
×
1584
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1585
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1586
        }
×
1587

1588
        if c.config.EnableLiveMigrationOptimize {
×
1589
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1590
        }
×
1591

1592
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1593

×
1594
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1595
}
1596

1597
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1598
        for _, lsName := range subnets {
2✔
1599
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1600
                if err != nil {
1✔
1601
                        klog.Error(err)
×
1602
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1603
                }
×
1604

1605
                if !exist {
2✔
1606
                        return false, nil
1✔
1607
                }
1✔
1608
        }
1609

1610
        return true, nil
1✔
1611
}
1612

1613
func (c *Controller) initResourceOnce() {
×
1614
        c.registerSubnetMetrics()
×
1615

×
1616
        if err := c.initNodeChassis(); err != nil {
×
1617
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1618
        }
×
1619

1620
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1621
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1622
        }
×
1623
        if err := c.syncSecurityGroup(); err != nil {
×
1624
                util.LogFatalAndExit(err, "failed to sync security group")
×
1625
        }
×
1626

1627
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1628
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1629
        }
×
1630

1631
        if err := c.initVpcNatGw(); err != nil {
×
1632
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1633
        }
×
1634
        if c.config.EnableLb {
×
1635
                if err := c.initVpcDNSConfig(); err != nil {
×
1636
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1637
                }
×
1638
        }
1639

1640
        // remove resources in ovndb that not exist any more in kubernetes resources
1641
        // process gc at last in case of affecting other init process
1642
        if err := c.gc(); err != nil {
×
1643
                util.LogFatalAndExit(err, "failed to run gc")
×
1644
        }
×
1645
}
1646

1647
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1648
        item, shutdown := queue.Get()
×
1649
        if shutdown {
×
1650
                return false
×
1651
        }
×
1652

1653
        err := func(item T) error {
×
1654
                defer queue.Done(item)
×
1655
                if err := handler(item); err != nil {
×
1656
                        queue.AddRateLimited(item)
×
1657
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1658
                }
×
1659
                queue.Forget(item)
×
1660
                return nil
×
1661
        }(item)
1662
        if err != nil {
×
1663
                utilruntime.HandleError(err)
×
1664
                return true
×
1665
        }
×
1666
        return true
×
1667
}
1668

1669
func getWorkItemKey(obj any) string {
×
1670
        switch v := obj.(type) {
×
1671
        case string:
×
1672
                return v
×
1673
        case *vpcService:
×
1674
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1675
        case *AdminNetworkPolicyChangedDelta:
×
1676
                return v.key
×
1677
        case *SwitchLBRuleInfo:
×
1678
                return v.Name
×
1679
        default:
×
1680
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1681
                if err != nil {
×
1682
                        utilruntime.HandleError(err)
×
1683
                        return ""
×
1684
                }
×
1685
                return key
×
1686
        }
1687
}
1688

1689
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1690
        return func() {
×
1691
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1692
                }
×
1693
        }
1694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc