• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 26870502843

03 Jun 2026 07:34AM UTC coverage: 25.64% (+0.004%) from 25.636%
26870502843

push

github

web-flow
chore(deps): update golang to v1.26.4 (#6810)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

14781 of 57648 relevant lines covered (25.64%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.12
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        kubeinformers "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        appsv1 "k8s.io/client-go/listers/apps/v1"
25
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
26
        v1 "k8s.io/client-go/listers/core/v1"
27
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
28
        netv1 "k8s.io/client-go/listers/networking/v1"
29
        "k8s.io/client-go/tools/cache"
30
        "k8s.io/client-go/tools/record"
31
        "k8s.io/client-go/util/workqueue"
32
        "k8s.io/klog/v2"
33
        "k8s.io/utils/keymutex"
34
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
35
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
36
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
37
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
38
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
39

40
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
41
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
42
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
43
        "github.com/kubeovn/kube-ovn/pkg/informer"
44
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
45
        "github.com/kubeovn/kube-ovn/pkg/ovs"
46
        "github.com/kubeovn/kube-ovn/pkg/util"
47
)
48

49
const controllerAgentName = "kube-ovn-controller"
50

51
const (
52
        logicalSwitchKey              = "ls"
53
        logicalRouterKey              = "lr"
54
        portGroupKey                  = "pg"
55
        networkPolicyKey              = "np"
56
        sgKey                         = "sg"
57
        sgsKey                        = "security_groups"
58
        u2oKey                        = "u2o"
59
        adminNetworkPolicyKey         = "anp"
60
        baselineAdminNetworkPolicyKey = "banp"
61
        ippoolKey                     = "ippool"
62
        clusterNetworkPolicyKey       = "cnp"
63
)
64

65
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
66
type Controller struct {
67
        config *Configuration
68

69
        ipam             *ovnipam.IPAM
70
        namedPort        *NamedPort
71
        anpPrioNameMap   map[int32]string
72
        anpNamePrioMap   map[string]int32
73
        bnpPrioNameMap   map[int32]string
74
        bnpNamePrioMap   map[string]int32
75
        priorityMapMutex sync.RWMutex
76

77
        OVNNbClient ovs.NbClient
78
        OVNSbClient ovs.SbClient
79

80
        // ExternalGatewayType define external gateway type, centralized
81
        ExternalGatewayType string
82

83
        podsLister             v1.PodLister
84
        podsSynced             cache.InformerSynced
85
        podIndexer             cache.Indexer
86
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
87
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
88
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
89
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
90
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
91
        podKeyMutex            keymutex.KeyMutex
92

93
        vpcsLister           kubeovnlister.VpcLister
94
        vpcSynced            cache.InformerSynced
95
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
96
        vpcLastPoliciesMap   *xsync.Map[string, string]
97
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
98
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
99
        vpcKeyMutex          keymutex.KeyMutex
100

101
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
102
        vpcNatGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
106
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
107
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
108
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
109
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
111
        vpcNatGwKeyMutex              keymutex.KeyMutex
112
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
113

114
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
115
        vpcEgressGatewaySynced           cache.InformerSynced
116
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
118
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
119

120
        // bgpConfLister/evpnConfLister are published asynchronously by the
121
        // optional-CRD background poller (StartBgpEvpnConfInformerFactory), but
122
        // read by VEG worker goroutines. Atomic pointers keep the read/write
123
        // race-free without locking the hot reconcile path.
124
        bgpConfLister  atomic.Pointer[kubeovnlister.BgpConfLister]
125
        bgpConfSynced  cache.InformerSynced
126
        evpnConfLister atomic.Pointer[kubeovnlister.EvpnConfLister]
127
        evpnConfSynced cache.InformerSynced
128

129
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
130
        switchLBRuleSynced      cache.InformerSynced
131
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
132
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
133
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
134

135
        vpcDNSLister           kubeovnlister.VpcDnsLister
136
        vpcDNSSynced           cache.InformerSynced
137
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
138
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
139

140
        subnetsLister           kubeovnlister.SubnetLister
141
        subnetSynced            cache.InformerSynced
142
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
143
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
144
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
145
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
146
        subnetKeyMutex          keymutex.KeyMutex
147

148
        ippoolLister            kubeovnlister.IPPoolLister
149
        ippoolSynced            cache.InformerSynced
150
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
151
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
152
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
153
        ippoolKeyMutex          keymutex.KeyMutex
154

155
        ipsLister     kubeovnlister.IPLister
156
        ipSynced      cache.InformerSynced
157
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
158
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
159
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
160

161
        virtualIpsLister          kubeovnlister.VipLister
162
        virtualIpsSynced          cache.InformerSynced
163
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
164
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
165
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
166
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
167

168
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
169
        iptablesEipSynced      cache.InformerSynced
170
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
171
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
172
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
173
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
174

175
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
176
        iptablesFipSynced      cache.InformerSynced
177
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
178
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
179
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
180

181
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
182
        iptablesDnatRuleSynced      cache.InformerSynced
183
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
184
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
185
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
186

187
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
188
        iptablesSnatRuleSynced      cache.InformerSynced
189
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
190
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
191
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192

193
        ovnEipsLister     kubeovnlister.OvnEipLister
194
        ovnEipSynced      cache.InformerSynced
195
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
196
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
197
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
198
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
199

200
        ovnFipsLister     kubeovnlister.OvnFipLister
201
        ovnFipSynced      cache.InformerSynced
202
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
203
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
204
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
205

206
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
207
        ovnSnatRuleSynced      cache.InformerSynced
208
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
209
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
210
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
211

212
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
213
        ovnDnatRuleSynced      cache.InformerSynced
214
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
215
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
216
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
217

218
        providerNetworksLister kubeovnlister.ProviderNetworkLister
219
        providerNetworkSynced  cache.InformerSynced
220

221
        vlansLister     kubeovnlister.VlanLister
222
        vlanSynced      cache.InformerSynced
223
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
224
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
225
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
226
        vlanKeyMutex    keymutex.KeyMutex
227

228
        namespacesLister  v1.NamespaceLister
229
        namespacesSynced  cache.InformerSynced
230
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
231
        nsKeyMutex        keymutex.KeyMutex
232

233
        nodesLister     v1.NodeLister
234
        nodesSynced     cache.InformerSynced
235
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
236
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
237
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
238
        nodeKeyMutex    keymutex.KeyMutex
239

240
        servicesLister     v1.ServiceLister
241
        serviceSynced      cache.InformerSynced
242
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
243
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
244
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
245
        svcKeyMutex        keymutex.KeyMutex
246

247
        endpointSlicesLister          discoveryv1.EndpointSliceLister
248
        endpointSlicesSynced          cache.InformerSynced
249
        epsIndexer                    cache.Indexer
250
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
251
        epKeyMutex                    keymutex.KeyMutex
252

253
        deploymentsLister appsv1.DeploymentLister
254
        deploymentsSynced cache.InformerSynced
255

256
        npsLister     netv1.NetworkPolicyLister
257
        npsSynced     cache.InformerSynced
258
        npIndexer     cache.Indexer
259
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
260
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
261
        npKeyMutex    keymutex.KeyMutex
262

263
        sgsLister          kubeovnlister.SecurityGroupLister
264
        sgSynced           cache.InformerSynced
265
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
266
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
267
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
268
        sgKeyMutex         keymutex.KeyMutex
269

270
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
271
        qosPolicySynced      cache.InformerSynced
272
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
273
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
274
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
275

276
        configMapsLister v1.ConfigMapLister
277
        configMapsSynced cache.InformerSynced
278

279
        anpsLister     anplister.AdminNetworkPolicyLister
280
        anpsSynced     cache.InformerSynced
281
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
282
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
283
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
284
        anpKeyMutex    keymutex.KeyMutex
285

286
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
287
        dnsNameResolverIndexer          cache.Indexer
288
        dnsNameResolversSynced          cache.InformerSynced
289
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
290
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
291

292
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
293
        banpsSynced     cache.InformerSynced
294
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
295
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
296
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
297
        banpKeyMutex    keymutex.KeyMutex
298

299
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
300
        cnpsSynced     cache.InformerSynced
301
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
302
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
303
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
304
        cnpKeyMutex    keymutex.KeyMutex
305

306
        csrLister           certListerv1.CertificateSigningRequestLister
307
        csrSynced           cache.InformerSynced
308
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
309

310
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
311
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
312
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
313

314
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
315
        netAttachSynced          cache.InformerSynced
316
        netAttachInformerFactory netAttach.SharedInformerFactory
317

318
        serviceCIDRStore           *util.ServiceCIDRStore
319
        serviceCIDRLister          netv1.ServiceCIDRLister
320
        serviceCIDRSynced          cache.InformerSynced
321
        serviceCIDRInformerFactory kubeinformers.SharedInformerFactory
322

323
        recorder               record.EventRecorder
324
        informerFactory        kubeinformers.SharedInformerFactory
325
        cmInformerFactory      kubeinformers.SharedInformerFactory
326
        deployInformerFactory  kubeinformers.SharedInformerFactory
327
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
328
        anpInformerFactory     anpinformer.SharedInformerFactory
329

330
        // Database health check
331
        dbFailureCount int
332

333
        distributedSubnetNeedSync atomic.Bool
334
}
335

336
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
337
        if rateLimiter == nil {
2✔
338
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
339
        }
1✔
340
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
341
}
342

343
// Run creates and runs a new ovn controller
344
func Run(ctx context.Context, config *Configuration) {
×
345
        klog.V(4).Info("Creating event broadcaster")
×
346
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
347
        eventBroadcaster.StartLogging(klog.Infof)
×
348
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
349
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
350
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
351
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
352
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
353
        )
×
354

×
355
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
356
        if err != nil {
×
357
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
358
        }
×
359

360
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
361
                kubeinformers.WithTransform(util.TrimPodForController),
×
362
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
363
                        listOption.AllowWatchBookmarks = true
×
364
                }))
×
365
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
366
                kubeinformers.WithNamespace(config.PodNamespace),
×
367
                kubeinformers.WithTransform(util.TrimManagedFields),
×
368
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
369
                        listOption.AllowWatchBookmarks = true
×
370
                }))
×
371
        // deployment informer used to list/watch vpc egress gateway workloads
372
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
373
                kubeinformers.WithTransform(util.TrimManagedFields),
×
374
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
375
                        listOption.AllowWatchBookmarks = true
×
376
                        listOption.LabelSelector = selector.String()
×
377
                }))
×
378
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
379
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
380
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
381
                        listOption.AllowWatchBookmarks = true
×
382
                }))
×
383
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
384
                anpinformer.WithTransform(util.TrimManagedFields),
×
385
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
386
                        listOption.AllowWatchBookmarks = true
×
387
                }))
×
388
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
389
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
390
                        listOption.AllowWatchBookmarks = true
×
391
                }),
×
392
        )
393
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
394
                informer.WithTransform(util.TrimManagedFields),
×
395
        )
×
396
        // Dedicated factory so that on clusters without the ServiceCIDR API the
×
397
        // failed list/watch does not contaminate the main informer factory.
×
398
        serviceCIDRInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
399
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
400
                        listOption.AllowWatchBookmarks = true
×
401
                }),
×
402
        )
403

404
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
405
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
406
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
407
        // BgpConf/EvpnConf informers are started lazily via StartBgpEvpnConfInformerFactory
×
408
        // because their CRDs are optional on clusters that don't use vpc-egress-gateway BGP/EVPN.
×
409
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
410
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
411
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
412
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
413
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
414
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
415
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
416
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
417
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
418
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
419
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
420
        podInformer := informerFactory.Core().V1().Pods()
×
421
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
422
        nodeInformer := informerFactory.Core().V1().Nodes()
×
423
        serviceInformer := informerFactory.Core().V1().Services()
×
424
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
425
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
426
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
427
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
428
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
429
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
430
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
431
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
432
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
433
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
434
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
435
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
436
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
437
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
438
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
439
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
440
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
441

×
442
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
443
        controller := &Controller{
×
444
                config:             config,
×
445
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
446
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
447
                ipam:               ovnipam.NewIPAM(),
×
448
                namedPort:          NewNamedPort(),
×
449

×
450
                vpcsLister:           vpcInformer.Lister(),
×
451
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
452
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
453
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
454
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
455
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
456
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
457

×
458
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
459
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
460
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
461
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
462
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
463
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
464
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
465
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
466
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
467
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
468
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
469
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
470
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
471
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
472
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
473
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
474
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
475

×
476
                // bgpConfLister/bgpConfSynced/evpnConfLister/evpnConfSynced are populated lazily
×
477
                // in startBgpEvpnConfInformer once the matching CRDs are detected.
×
478

×
479
                subnetsLister:           subnetInformer.Lister(),
×
480
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
481
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
482
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
483
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
484
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
485
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
486

×
487
                ippoolLister:            ippoolInformer.Lister(),
×
488
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
489
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
490
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
491
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
492
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
493

×
494
                ipsLister:     ipInformer.Lister(),
×
495
                ipSynced:      ipInformer.Informer().HasSynced,
×
496
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
497
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
498
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
499

×
500
                virtualIpsLister:          virtualIPInformer.Lister(),
×
501
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
502
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
503
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
504
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
505
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
506

×
507
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
508
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
509
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
510
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
511
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
512
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
513

×
514
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
515
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
516
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
517
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
518
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
519

×
520
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
521
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
522
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
523
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
524
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
525

×
526
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
527
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
528
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
529
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
530
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
531

×
532
                vlansLister:     vlanInformer.Lister(),
×
533
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
534
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
535
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
536
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
537
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
538

×
539
                providerNetworksLister: providerNetworkInformer.Lister(),
×
540
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
541

×
542
                podsLister:          podInformer.Lister(),
×
543
                podsSynced:          podInformer.Informer().HasSynced,
×
544
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
545
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
546
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
547
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
548
                                Name:          "DeletePod",
×
549
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
550
                        },
×
551
                ),
×
552
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
553
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
554

×
555
                namespacesLister:  namespaceInformer.Lister(),
×
556
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
557
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
558
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
559

×
560
                nodesLister:     nodeInformer.Lister(),
×
561
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
562
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
563
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
564
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
565
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
566

×
567
                servicesLister:     serviceInformer.Lister(),
×
568
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
569
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
570
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
571
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
572
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
573

×
574
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
575
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
576
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
577
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
578

×
579
                deploymentsLister: deploymentInformer.Lister(),
×
580
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
581

×
582
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
583
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
584
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
585
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
586
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
587

×
588
                configMapsLister: configMapInformer.Lister(),
×
589
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
590

×
591
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
592
                sgsLister:          sgInformer.Lister(),
×
593
                sgSynced:           sgInformer.Informer().HasSynced,
×
594
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
595
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
596
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
597

×
598
                ovnEipsLister:     ovnEipInformer.Lister(),
×
599
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
600
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
601
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
602
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
603
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
604

×
605
                ovnFipsLister:     ovnFipInformer.Lister(),
×
606
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
607
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
608
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
609
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
610

×
611
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
612
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
613
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
614
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
615
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
616

×
617
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
618
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
619
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
620
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
621
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
622

×
623
                csrLister:           csrInformer.Lister(),
×
624
                csrSynced:           csrInformer.Informer().HasSynced,
×
625
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
626

×
627
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
628
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
629
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
630

×
631
                netAttachLister:          netAttachInformer.Lister(),
×
632
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
633
                netAttachInformerFactory: attachNetInformerFactory,
×
634

×
635
                serviceCIDRStore:           util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
636
                serviceCIDRInformerFactory: serviceCIDRInformerFactory,
×
637

×
638
                recorder:               recorder,
×
639
                informerFactory:        informerFactory,
×
640
                cmInformerFactory:      cmInformerFactory,
×
641
                deployInformerFactory:  deployInformerFactory,
×
642
                kubeovnInformerFactory: kubeovnInformerFactory,
×
643
                anpInformerFactory:     anpInformerFactory,
×
644
        }
×
645

×
646
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
647
                config.OvnNbAddr,
×
648
                config.OvnTimeout,
×
649
                config.OvsDbConnectTimeout,
×
650
                config.OvsDbInactivityTimeout,
×
651
                config.OvsDbConnectMaxRetry,
×
652
        ); err != nil {
×
653
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
654
        }
×
655
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
656
                config.OvnSbAddr,
×
657
                config.OvnTimeout,
×
658
                config.OvsDbConnectTimeout,
×
659
                config.OvsDbInactivityTimeout,
×
660
                config.OvsDbConnectMaxRetry,
×
661
        ); err != nil {
×
662
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
663
        }
×
664
        if config.EnableLb {
×
665
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
666
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
667
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
668
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
669
                        "DeleteSwitchLBRule",
×
670
                        workqueue.NewTypedMaxOfRateLimiter(
×
671
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
672
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
673
                        ),
×
674
                )
×
675
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
676
                        "UpdateSwitchLBRule",
×
677
                        workqueue.NewTypedMaxOfRateLimiter(
×
678
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
679
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
680
                        ),
×
681
                )
×
682

×
683
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
684
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
685
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
686
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
687
        }
×
688

689
        if config.EnableNP {
×
690
                controller.npsLister = npInformer.Lister()
×
691
                controller.npsSynced = npInformer.Informer().HasSynced
×
692
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
693
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
694
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
695
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
696
        }
×
697

698
        if config.EnableANP {
×
699
                controller.anpsLister = anpInformer.Lister()
×
700
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
701
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
702
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
703
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
704
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
705

×
706
                controller.banpsLister = banpInformer.Lister()
×
707
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
708
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
709
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
710
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
711
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
712

×
713
                controller.cnpsLister = cnpInformer.Lister()
×
714
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
715
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
716
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
717
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
718
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
719
        }
×
720

721
        if config.EnableDNSNameResolver {
×
722
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
723
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
724
                if err := dnsNameResolverInformer.Informer().AddIndexers(cache.Indexers{
×
725
                        IndexDNSNameResolverByName: indexDNSNameResolverByName,
×
726
                }); err != nil {
×
727
                        util.LogFatalAndExit(err, "failed to add DNSNameResolver indexer")
×
728
                }
×
729
                controller.dnsNameResolverIndexer = dnsNameResolverInformer.Informer().GetIndexer()
×
730
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
731
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
732
        }
733

734
        if err := controller.setupIndexers(podInformer.Informer(), endpointSliceInformer.Informer()); err != nil {
×
735
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
736
        }
×
737

738
        defer controller.shutdown()
×
739
        klog.Info("Starting OVN controller")
×
740

×
741
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
742
        // NAD CRD is optional, so we check if it exists before starting the informer
×
743
        controller.StartNetAttachInformerFactory(ctx)
×
744

×
745
        // ServiceCIDR (networking.k8s.io/v1) is GA in K8s 1.33; older clusters
×
746
        // don't have the API at all. Best-effort start with periodic retry.
×
747
        controller.StartServiceCIDRInformerFactory(ctx)
×
748

×
749
        // BgpConf/EvpnConf are optional CRDs (v1.16.0+, used by vpc-egress-gateway BGP/EVPN).
×
750
        // They may be missing on clusters upgraded from <v1.16 via Helm, which does not
×
751
        // re-apply the `crds/` directory on `helm upgrade`. Best-effort start with periodic retry.
×
752
        controller.StartBgpEvpnConfInformerFactory(ctx)
×
753

×
754
        // Wait for the caches to be synced before starting workers
×
755
        controller.informerFactory.Start(ctx.Done())
×
756
        controller.cmInformerFactory.Start(ctx.Done())
×
757
        controller.deployInformerFactory.Start(ctx.Done())
×
758
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
759
        controller.anpInformerFactory.Start(ctx.Done())
×
760
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
761

×
762
        klog.Info("Waiting for informer caches to sync")
×
763
        cacheSyncs := []cache.InformerSynced{
×
764
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
765
                controller.vpcSynced, controller.subnetSynced,
×
766
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
767
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
768
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
769
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
770
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
771
                controller.ovnDnatRuleSynced,
×
772
        }
×
773
        if controller.config.EnableLb {
×
774
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
775
        }
×
776
        if controller.config.EnableNP {
×
777
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
778
        }
×
779
        if controller.config.EnableANP {
×
780
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
781
        }
×
782
        if controller.config.EnableDNSNameResolver {
×
783
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
784
        }
×
785

786
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
787
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
788
        }
×
789

790
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
791
                AddFunc:    controller.enqueueAddPod,
×
792
                DeleteFunc: controller.enqueueDeletePod,
×
793
                UpdateFunc: controller.enqueueUpdatePod,
×
794
        }); err != nil {
×
795
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
796
        }
×
797

798
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
799
                AddFunc:    controller.enqueueAddNamespace,
×
800
                UpdateFunc: controller.enqueueUpdateNamespace,
×
801
                DeleteFunc: controller.enqueueDeleteNamespace,
×
802
        }); err != nil {
×
803
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
804
        }
×
805

806
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
807
                AddFunc:    controller.enqueueAddNode,
×
808
                UpdateFunc: controller.enqueueUpdateNode,
×
809
                DeleteFunc: controller.enqueueDeleteNode,
×
810
        }); err != nil {
×
811
                util.LogFatalAndExit(err, "failed to add node event handler")
×
812
        }
×
813

814
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
815
                AddFunc:    controller.enqueueAddService,
×
816
                DeleteFunc: controller.enqueueDeleteService,
×
817
                UpdateFunc: controller.enqueueUpdateService,
×
818
        }); err != nil {
×
819
                util.LogFatalAndExit(err, "failed to add service event handler")
×
820
        }
×
821

822
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
823
                AddFunc:    controller.enqueueAddEndpointSlice,
×
824
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
825
        }); err != nil {
×
826
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
827
        }
×
828

829
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
830
                AddFunc:    controller.enqueueAddDeployment,
×
831
                UpdateFunc: controller.enqueueUpdateDeployment,
×
832
        }); err != nil {
×
833
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
834
        }
×
835

836
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
837
                AddFunc:    controller.enqueueAddVpc,
×
838
                UpdateFunc: controller.enqueueUpdateVpc,
×
839
                DeleteFunc: controller.enqueueDelVpc,
×
840
        }); err != nil {
×
841
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
842
        }
×
843

844
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
845
                AddFunc:    controller.enqueueAddVpcNatGw,
×
846
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
847
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
848
        }); err != nil {
×
849
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
850
        }
×
851

852
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
853
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
854
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
855
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
856
        }); err != nil {
×
857
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
858
        }
×
859

860
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
861
                AddFunc:    controller.enqueueAddSubnet,
×
862
                UpdateFunc: controller.enqueueUpdateSubnet,
×
863
                DeleteFunc: controller.enqueueDeleteSubnet,
×
864
        }); err != nil {
×
865
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
866
        }
×
867

868
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
869
                AddFunc:    controller.enqueueAddIPPool,
×
870
                UpdateFunc: controller.enqueueUpdateIPPool,
×
871
                DeleteFunc: controller.enqueueDeleteIPPool,
×
872
        }); err != nil {
×
873
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
874
        }
×
875

876
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
877
                AddFunc:    controller.enqueueAddIP,
×
878
                UpdateFunc: controller.enqueueUpdateIP,
×
879
                DeleteFunc: controller.enqueueDelIP,
×
880
        }); err != nil {
×
881
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
882
        }
×
883

884
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
885
                AddFunc:    controller.enqueueAddVlan,
×
886
                DeleteFunc: controller.enqueueDelVlan,
×
887
                UpdateFunc: controller.enqueueUpdateVlan,
×
888
        }); err != nil {
×
889
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
890
        }
×
891

892
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
893
                AddFunc:    controller.enqueueAddSg,
×
894
                DeleteFunc: controller.enqueueDeleteSg,
×
895
                UpdateFunc: controller.enqueueUpdateSg,
×
896
        }); err != nil {
×
897
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
898
        }
×
899

900
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
901
                AddFunc:    controller.enqueueAddVirtualIP,
×
902
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
903
                DeleteFunc: controller.enqueueDelVirtualIP,
×
904
        }); err != nil {
×
905
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
906
        }
×
907

908
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
909
                AddFunc:    controller.enqueueAddIptablesEip,
×
910
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
911
                DeleteFunc: controller.enqueueDelIptablesEip,
×
912
        }); err != nil {
×
913
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
914
        }
×
915

916
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
917
                AddFunc:    controller.enqueueAddIptablesFip,
×
918
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
919
                DeleteFunc: controller.enqueueDelIptablesFip,
×
920
        }); err != nil {
×
921
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
922
        }
×
923

924
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
925
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
926
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
927
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
928
        }); err != nil {
×
929
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
930
        }
×
931

932
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
933
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
934
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
935
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
936
        }); err != nil {
×
937
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
938
        }
×
939

940
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
941
                AddFunc:    controller.enqueueAddOvnEip,
×
942
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
943
                DeleteFunc: controller.enqueueDelOvnEip,
×
944
        }); err != nil {
×
945
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
946
        }
×
947

948
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
949
                AddFunc:    controller.enqueueAddOvnFip,
×
950
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
951
                DeleteFunc: controller.enqueueDelOvnFip,
×
952
        }); err != nil {
×
953
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
954
        }
×
955

956
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
957
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
958
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
959
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
960
        }); err != nil {
×
961
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
962
        }
×
963

964
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
965
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
966
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
967
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
968
        }); err != nil {
×
969
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
970
        }
×
971

972
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
973
                AddFunc:    controller.enqueueAddQoSPolicy,
×
974
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
975
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
976
        }); err != nil {
×
977
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
978
        }
×
979

980
        if config.EnableLb {
×
981
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
982
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
983
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
984
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
985
                }); err != nil {
×
986
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
987
                }
×
988

989
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
990
                        AddFunc:    controller.enqueueAddVpcDNS,
×
991
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
992
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
993
                }); err != nil {
×
994
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
995
                }
×
996
        }
997

998
        if config.EnableNP {
×
999
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1000
                        AddFunc:    controller.enqueueAddNp,
×
1001
                        UpdateFunc: controller.enqueueUpdateNp,
×
1002
                        DeleteFunc: controller.enqueueDeleteNp,
×
1003
                }); err != nil {
×
1004
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
1005
                }
×
1006
        }
1007

1008
        if config.EnableANP {
×
1009
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1010
                        AddFunc:    controller.enqueueAddAnp,
×
1011
                        UpdateFunc: controller.enqueueUpdateAnp,
×
1012
                        DeleteFunc: controller.enqueueDeleteAnp,
×
1013
                }); err != nil {
×
1014
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
1015
                }
×
1016

1017
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1018
                        AddFunc:    controller.enqueueAddBanp,
×
1019
                        UpdateFunc: controller.enqueueUpdateBanp,
×
1020
                        DeleteFunc: controller.enqueueDeleteBanp,
×
1021
                }); err != nil {
×
1022
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
1023
                }
×
1024

1025
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1026
                        AddFunc:    controller.enqueueAddCnp,
×
1027
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1028
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1029
                }); err != nil {
×
1030
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1031
                }
×
1032

1033
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1034
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1035
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1036
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1037
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1038
        }
1039

1040
        if config.EnableDNSNameResolver {
×
1041
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1042
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1043
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1044
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1045
                }); err != nil {
×
1046
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1047
                }
×
1048
        }
1049

1050
        if config.EnableOVNIPSec {
×
1051
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1052
                        AddFunc:    controller.enqueueAddCsr,
×
1053
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1054
                        // no need to add delete func for csr
×
1055
                }); err != nil {
×
1056
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1057
                }
×
1058
        }
1059

1060
        controller.Run(ctx)
×
1061
}
1062

1063
// Run will set up the event handlers for types we are interested in, as well
1064
// as syncing informer caches and starting workers. It will block until stopCh
1065
// is closed, at which point it will shutdown the workqueue and wait for
1066
// workers to finish processing their current work items.
1067
func (c *Controller) Run(ctx context.Context) {
×
1068
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1069
        // Otherwise, the init process should be placed after all workers have already started working
×
1070
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1071
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1072
        }
×
1073

1074
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1075
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1076
        }
×
1077

1078
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1079
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1080
        }
×
1081

1082
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1083
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1084
        }
×
1085

1086
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1087
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1088
        }
×
1089

1090
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1091
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1092
        }
×
1093

1094
        if err := c.InitOVN(); err != nil {
×
1095
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1096
        }
×
1097

1098
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1099
        if err := c.syncIPCR(); err != nil {
×
1100
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1101
        }
×
1102

1103
        if err := c.syncFinalizers(); err != nil {
×
1104
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1105
        }
×
1106

1107
        if err := c.InitIPAM(); err != nil {
×
1108
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1109
        }
×
1110

1111
        if err := c.syncNodeRoutes(); err != nil {
×
1112
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1113
        }
×
1114

1115
        if err := c.syncSubnetCR(); err != nil {
×
1116
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1117
        }
×
1118

1119
        if err := c.syncVlanCR(); err != nil {
×
1120
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1121
        }
×
1122

1123
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1124
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1125
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1126
                }
×
1127
        }
1128

1129
        // start workers to do all the network operations
1130
        c.startWorkers(ctx)
×
1131

×
1132
        c.initResourceOnce()
×
1133
        <-ctx.Done()
×
1134
        klog.Info("Shutting down workers")
×
1135

×
1136
        c.OVNNbClient.Close()
×
1137
        c.OVNSbClient.Close()
×
1138
}
1139

1140
func (c *Controller) dbStatus() {
×
1141
        const maxFailures = 5
×
1142

×
1143
        done := make(chan error, 2)
×
1144
        go func() {
×
1145
                done <- c.OVNNbClient.Echo(context.Background())
×
1146
        }()
×
1147
        go func() {
×
1148
                done <- c.OVNSbClient.Echo(context.Background())
×
1149
        }()
×
1150

1151
        resultsReceived := 0
×
1152
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1153

×
1154
        for resultsReceived < 2 {
×
1155
                select {
×
1156
                case err := <-done:
×
1157
                        resultsReceived++
×
1158
                        if err != nil {
×
1159
                                c.dbFailureCount++
×
1160
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1161
                                if c.dbFailureCount >= maxFailures {
×
1162
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1163
                                }
×
1164
                                return
×
1165
                        }
1166
                case <-timeout:
×
1167
                        c.dbFailureCount++
×
1168
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1169
                        if c.dbFailureCount >= maxFailures {
×
1170
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1171
                        }
×
1172
                        return
×
1173
                }
1174
        }
1175

1176
        if c.dbFailureCount > 0 {
×
1177
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1178
                c.dbFailureCount = 0
×
1179
        }
×
1180
}
1181

1182
func (c *Controller) shutdown() {
×
1183
        utilruntime.HandleCrash()
×
1184

×
1185
        c.addOrUpdatePodQueue.ShutDown()
×
1186
        c.deletePodQueue.ShutDown()
×
1187
        c.updatePodSecurityQueue.ShutDown()
×
1188

×
1189
        c.addNamespaceQueue.ShutDown()
×
1190

×
1191
        c.addOrUpdateSubnetQueue.ShutDown()
×
1192
        c.deleteSubnetQueue.ShutDown()
×
1193
        c.updateSubnetStatusQueue.ShutDown()
×
1194
        c.syncVirtualPortsQueue.ShutDown()
×
1195

×
1196
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1197
        c.updateIPPoolStatusQueue.ShutDown()
×
1198
        c.deleteIPPoolQueue.ShutDown()
×
1199

×
1200
        c.addNodeQueue.ShutDown()
×
1201
        c.updateNodeQueue.ShutDown()
×
1202
        c.deleteNodeQueue.ShutDown()
×
1203

×
1204
        c.addServiceQueue.ShutDown()
×
1205
        c.deleteServiceQueue.ShutDown()
×
1206
        c.updateServiceQueue.ShutDown()
×
1207
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1208

×
1209
        c.addVlanQueue.ShutDown()
×
1210
        c.delVlanQueue.ShutDown()
×
1211
        c.updateVlanQueue.ShutDown()
×
1212

×
1213
        c.addOrUpdateVpcQueue.ShutDown()
×
1214
        c.updateVpcStatusQueue.ShutDown()
×
1215
        c.delVpcQueue.ShutDown()
×
1216

×
1217
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1218
        c.initVpcNatGatewayQueue.ShutDown()
×
1219
        c.delVpcNatGatewayQueue.ShutDown()
×
1220
        c.updateVpcEipQueue.ShutDown()
×
1221
        c.updateVpcFloatingIPQueue.ShutDown()
×
1222
        c.updateVpcDnatQueue.ShutDown()
×
1223
        c.updateVpcSnatQueue.ShutDown()
×
1224
        c.updateVpcSubnetQueue.ShutDown()
×
1225

×
1226
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1227
        c.delVpcEgressGatewayQueue.ShutDown()
×
1228

×
1229
        if c.config.EnableLb {
×
1230
                c.addSwitchLBRuleQueue.ShutDown()
×
1231
                c.delSwitchLBRuleQueue.ShutDown()
×
1232
                c.updateSwitchLBRuleQueue.ShutDown()
×
1233

×
1234
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1235
                c.delVpcDNSQueue.ShutDown()
×
1236
        }
×
1237

1238
        c.addIPQueue.ShutDown()
×
1239
        c.updateIPQueue.ShutDown()
×
1240
        c.delIPQueue.ShutDown()
×
1241

×
1242
        c.addVirtualIPQueue.ShutDown()
×
1243
        c.updateVirtualIPQueue.ShutDown()
×
1244
        c.updateVirtualParentsQueue.ShutDown()
×
1245
        c.delVirtualIPQueue.ShutDown()
×
1246

×
1247
        c.addIptablesEipQueue.ShutDown()
×
1248
        c.updateIptablesEipQueue.ShutDown()
×
1249
        c.resetIptablesEipQueue.ShutDown()
×
1250
        c.delIptablesEipQueue.ShutDown()
×
1251

×
1252
        c.addIptablesFipQueue.ShutDown()
×
1253
        c.updateIptablesFipQueue.ShutDown()
×
1254
        c.delIptablesFipQueue.ShutDown()
×
1255

×
1256
        c.addIptablesDnatRuleQueue.ShutDown()
×
1257
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1258
        c.delIptablesDnatRuleQueue.ShutDown()
×
1259

×
1260
        c.addIptablesSnatRuleQueue.ShutDown()
×
1261
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1262
        c.delIptablesSnatRuleQueue.ShutDown()
×
1263

×
1264
        c.addQoSPolicyQueue.ShutDown()
×
1265
        c.updateQoSPolicyQueue.ShutDown()
×
1266
        c.delQoSPolicyQueue.ShutDown()
×
1267

×
1268
        c.addOvnEipQueue.ShutDown()
×
1269
        c.updateOvnEipQueue.ShutDown()
×
1270
        c.resetOvnEipQueue.ShutDown()
×
1271
        c.delOvnEipQueue.ShutDown()
×
1272

×
1273
        c.addOvnFipQueue.ShutDown()
×
1274
        c.updateOvnFipQueue.ShutDown()
×
1275
        c.delOvnFipQueue.ShutDown()
×
1276

×
1277
        c.addOvnSnatRuleQueue.ShutDown()
×
1278
        c.updateOvnSnatRuleQueue.ShutDown()
×
1279
        c.delOvnSnatRuleQueue.ShutDown()
×
1280

×
1281
        c.addOvnDnatRuleQueue.ShutDown()
×
1282
        c.updateOvnDnatRuleQueue.ShutDown()
×
1283
        c.delOvnDnatRuleQueue.ShutDown()
×
1284

×
1285
        if c.config.EnableNP {
×
1286
                c.updateNpQueue.ShutDown()
×
1287
                c.deleteNpQueue.ShutDown()
×
1288
        }
×
1289
        if c.config.EnableANP {
×
1290
                c.addAnpQueue.ShutDown()
×
1291
                c.updateAnpQueue.ShutDown()
×
1292
                c.deleteAnpQueue.ShutDown()
×
1293

×
1294
                c.addBanpQueue.ShutDown()
×
1295
                c.updateBanpQueue.ShutDown()
×
1296
                c.deleteBanpQueue.ShutDown()
×
1297

×
1298
                c.addCnpQueue.ShutDown()
×
1299
                c.updateCnpQueue.ShutDown()
×
1300
                c.deleteCnpQueue.ShutDown()
×
1301
        }
×
1302

1303
        if c.config.EnableDNSNameResolver {
×
1304
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1305
                c.deleteDNSNameResolverQueue.ShutDown()
×
1306
        }
×
1307

1308
        c.addOrUpdateSgQueue.ShutDown()
×
1309
        c.delSgQueue.ShutDown()
×
1310
        c.syncSgPortsQueue.ShutDown()
×
1311

×
1312
        c.addOrUpdateCsrQueue.ShutDown()
×
1313

×
1314
        if c.config.EnableLiveMigrationOptimize {
×
1315
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1316
        }
×
1317
}
1318

1319
func (c *Controller) startWorkers(ctx context.Context) {
×
1320
        klog.Info("Starting workers")
×
1321

×
1322
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1323
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1324
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1325

×
1326
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1327
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1330
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1331
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1332
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1333
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1334
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1335
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1336
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1337
        // add default and join subnet and wait them ready
×
1338
        for range c.config.WorkerNum {
×
1339
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1340
        }
×
1341
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1342
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1343
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1344
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1345
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1346
                klog.Infof("wait for subnets %v ready", subnets)
×
1347

×
1348
                return c.allSubnetReady(subnets...)
×
1349
        })
×
1350
        if err != nil {
×
1351
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1352
        }
×
1353

1354
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1355
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1356
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1357

×
1358
        // run node worker before handle any pods
×
1359
        for range c.config.WorkerNum {
×
1360
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1361
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1362
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1363
        }
×
1364
        for {
×
1365
                ready := true
×
1366
                time.Sleep(3 * time.Second)
×
1367
                nodes, err := c.nodesLister.List(labels.Everything())
×
1368
                if err != nil {
×
1369
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1370
                }
×
1371
                for _, node := range nodes {
×
1372
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1373
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1374
                                ready = false
×
1375
                                break
×
1376
                        }
1377
                }
1378
                if ready {
×
1379
                        break
×
1380
                }
1381
        }
1382

1383
        if c.config.EnableLb {
×
1384
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1385
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1386
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1387

×
1388
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1389
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1390
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1391

×
1392
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1393
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1394
                go wait.Until(func() {
×
1395
                        c.resyncVpcDNSConfig()
×
1396
                }, 5*time.Second, ctx.Done())
×
1397
        }
1398

1399
        for range c.config.WorkerNum {
×
1400
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1401
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1402
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1403

×
1404
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1405
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1406
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1407
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1408
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1409

×
1410
                if c.config.EnableLb {
×
1411
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1412
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1413
                }
×
1414

1415
                if c.config.EnableNP {
×
1416
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1417
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1418
                }
×
1419

1420
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1421
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1422
        }
1423

1424
        if c.config.EnableEipSnat {
×
1425
                go wait.Until(func() {
×
1426
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1427
                        c.resyncExternalGateway()
×
1428
                }, time.Second, ctx.Done())
×
1429

1430
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1431
                c.OVNNbClient.MonitorBFD()
×
1432
        }
1433
        // TODO: we should merge these two vpc nat config into one config and resync them together
1434
        go wait.Until(func() {
×
1435
                c.resyncVpcNatGwConfig()
×
1436
        }, time.Second, ctx.Done())
×
1437

1438
        go wait.Until(func() {
×
1439
                c.resyncVpcNatConfig()
×
1440
        }, time.Second, ctx.Done())
×
1441

1442
        if c.config.GCInterval != 0 {
×
1443
                go wait.Until(func() {
×
1444
                        if err := c.markAndCleanLSP(); err != nil {
×
1445
                                klog.Errorf("gc lsp error: %v", err)
×
1446
                        }
×
1447
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1448
        }
1449

1450
        go wait.Until(func() {
×
1451
                if err := c.inspectPod(); err != nil {
×
1452
                        klog.Errorf("inspection error: %v", err)
×
1453
                }
×
1454
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1455

1456
        if c.config.EnableExternalVpc {
×
1457
                go wait.Until(func() {
×
1458
                        c.syncExternalVpc()
×
1459
                }, 5*time.Second, ctx.Done())
×
1460
        }
1461

1462
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1463
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1464
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1465
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1466

×
1467
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1468
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1469
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1470
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1471

×
1472
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1473
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1474
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1475

×
1476
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1477
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1478
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1479

×
1480
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1481
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1482
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1483

×
1484
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1485

×
1486
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1487
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1488
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1489

×
1490
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1491
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1492
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1493
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1494

×
1495
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1496
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1497
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1498
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1499

×
1500
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1501
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1502
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1503

×
1504
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1505
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1506
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1507

×
1508
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1509
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1510
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1511

×
1512
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1513
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1514
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1515

×
1516
        if c.config.EnableANP {
×
1517
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1518
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1519
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1520

×
1521
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1522
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1523
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1524

×
1525
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1526
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1527
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1528
        }
×
1529

1530
        if c.config.EnableDNSNameResolver {
×
1531
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1532
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1533
        }
×
1534

1535
        if c.config.EnableLiveMigrationOptimize {
×
1536
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1537
        }
×
1538

1539
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1540

×
1541
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1542
}
1543

1544
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1545
        for _, lsName := range subnets {
2✔
1546
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1547
                if err != nil {
1✔
1548
                        klog.Error(err)
×
1549
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1550
                }
×
1551

1552
                if !exist {
2✔
1553
                        return false, nil
1✔
1554
                }
1✔
1555
        }
1556

1557
        return true, nil
1✔
1558
}
1559

1560
func (c *Controller) initResourceOnce() {
×
1561
        c.registerSubnetMetrics()
×
1562

×
1563
        if err := c.initNodeChassis(); err != nil {
×
1564
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1565
        }
×
1566

1567
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1568
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1569
        }
×
1570
        if err := c.syncSecurityGroup(); err != nil {
×
1571
                util.LogFatalAndExit(err, "failed to sync security group")
×
1572
        }
×
1573

1574
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1575
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1576
        }
×
1577

1578
        if err := c.initVpcNatGw(); err != nil {
×
1579
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1580
        }
×
1581
        if c.config.EnableLb {
×
1582
                if err := c.initVpcDNSConfig(); err != nil {
×
1583
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1584
                }
×
1585
        }
1586

1587
        // remove resources in ovndb that not exist any more in kubernetes resources
1588
        // process gc at last in case of affecting other init process
1589
        if err := c.gc(); err != nil {
×
1590
                util.LogFatalAndExit(err, "failed to run gc")
×
1591
        }
×
1592
}
1593

1594
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1595
        item, shutdown := queue.Get()
×
1596
        if shutdown {
×
1597
                return false
×
1598
        }
×
1599

1600
        err := func(item T) error {
×
1601
                defer queue.Done(item)
×
1602
                if err := handler(item); err != nil {
×
1603
                        queue.AddRateLimited(item)
×
1604
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1605
                }
×
1606
                queue.Forget(item)
×
1607
                return nil
×
1608
        }(item)
1609
        if err != nil {
×
1610
                utilruntime.HandleError(err)
×
1611
                return true
×
1612
        }
×
1613
        return true
×
1614
}
1615

1616
func getWorkItemKey(obj any) string {
×
1617
        switch v := obj.(type) {
×
1618
        case string:
×
1619
                return v
×
1620
        case *vpcService:
×
1621
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1622
        case *AdminNetworkPolicyChangedDelta:
×
1623
                return v.key
×
1624
        case *SwitchLBRuleInfo:
×
1625
                return v.Name
×
1626
        default:
×
1627
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1628
                if err != nil {
×
1629
                        utilruntime.HandleError(err)
×
1630
                        return ""
×
1631
                }
×
1632
                return key
×
1633
        }
1634
}
1635

1636
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1637
        return func() {
×
1638
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1639
                }
×
1640
        }
1641
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc