• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27389809800

12 Jun 2026 02:05AM UTC coverage: 26.344% (+0.3%) from 26.068%
27389809800

push

github

web-flow
feat(controller): add RouterLBRule CRD for L3/L4 load balancing on VPC router (#6563)

* feat(controller): add RouterLBRule CRD for L3/L4 load balancing on VPC router

Signed-off-by: Andrii Shestakov <andrii.shestakov@gcore.com>

* fixes

Signed-off-by: Andrii Shestakov <andrii.shestakov@gcore.com>

---------

Signed-off-by: Andrii Shestakov <andrii.shestakov@gcore.com>
Co-authored-by: Andrii Shestakov <andrii.shestakov@gcore.com>

330 of 710 new or added lines in 13 files covered. (46.48%)

1 existing line in 1 file now uncovered.

15449 of 58644 relevant lines covered (26.34%)

0.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.08
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        kubeinformers "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        appsv1 "k8s.io/client-go/listers/apps/v1"
25
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
26
        v1 "k8s.io/client-go/listers/core/v1"
27
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
28
        netv1 "k8s.io/client-go/listers/networking/v1"
29
        "k8s.io/client-go/tools/cache"
30
        "k8s.io/client-go/tools/record"
31
        "k8s.io/client-go/util/workqueue"
32
        "k8s.io/klog/v2"
33
        "k8s.io/utils/keymutex"
34
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
35
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
36
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
37
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
38
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
39

40
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
41
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
42
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
43
        "github.com/kubeovn/kube-ovn/pkg/informer"
44
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
45
        "github.com/kubeovn/kube-ovn/pkg/ovs"
46
        "github.com/kubeovn/kube-ovn/pkg/util"
47
)
48

49
const controllerAgentName = "kube-ovn-controller"
50

51
const (
52
        logicalSwitchKey              = "ls"
53
        logicalRouterKey              = "lr"
54
        portGroupKey                  = "pg"
55
        networkPolicyKey              = "np"
56
        sgKey                         = "sg"
57
        sgsKey                        = "security_groups"
58
        u2oKey                        = "u2o"
59
        adminNetworkPolicyKey         = "anp"
60
        baselineAdminNetworkPolicyKey = "banp"
61
        ippoolKey                     = "ippool"
62
        clusterNetworkPolicyKey       = "cnp"
63
)
64

65
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
66
type Controller struct {
67
        config *Configuration
68

69
        ipam             *ovnipam.IPAM
70
        namedPort        *NamedPort
71
        anpPrioNameMap   map[int32]string
72
        anpNamePrioMap   map[string]int32
73
        bnpPrioNameMap   map[int32]string
74
        bnpNamePrioMap   map[string]int32
75
        priorityMapMutex sync.RWMutex
76

77
        OVNNbClient ovs.NbClient
78
        OVNSbClient ovs.SbClient
79

80
        // ExternalGatewayType define external gateway type, centralized
81
        ExternalGatewayType string
82

83
        podsLister             v1.PodLister
84
        podsSynced             cache.InformerSynced
85
        podIndexer             cache.Indexer
86
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
87
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
88
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
89
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
90
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
91
        podKeyMutex            keymutex.KeyMutex
92

93
        vpcsLister           kubeovnlister.VpcLister
94
        vpcSynced            cache.InformerSynced
95
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
96
        vpcLastPoliciesMap   *xsync.Map[string, string]
97
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
98
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
99
        vpcKeyMutex          keymutex.KeyMutex
100

101
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
102
        vpcNatGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
106
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
107
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
108
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
109
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
111
        vpcNatGwKeyMutex              keymutex.KeyMutex
112
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
113

114
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
115
        vpcEgressGatewaySynced           cache.InformerSynced
116
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
118
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
119

120
        // bgpConfLister/evpnConfLister are published asynchronously by the
121
        // optional-CRD background poller (StartBgpEvpnConfInformerFactory), but
122
        // read by VEG worker goroutines. Atomic pointers keep the read/write
123
        // race-free without locking the hot reconcile path.
124
        bgpConfLister  atomic.Pointer[kubeovnlister.BgpConfLister]
125
        bgpConfSynced  cache.InformerSynced
126
        evpnConfLister atomic.Pointer[kubeovnlister.EvpnConfLister]
127
        evpnConfSynced cache.InformerSynced
128

129
        routerLBRuleLister      kubeovnlister.RouterLBRuleLister
130
        routerLBRuleSynced      cache.InformerSynced
131
        addRouterLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
132
        updateRouterLBRuleQueue workqueue.TypedRateLimitingInterface[*RouterLBRuleInfo]
133
        delRouterLBRuleQueue    workqueue.TypedRateLimitingInterface[*RouterLBRuleInfo]
134

135
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
136
        switchLBRuleSynced      cache.InformerSynced
137
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
138
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
139
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
140

141
        vpcDNSLister           kubeovnlister.VpcDnsLister
142
        vpcDNSSynced           cache.InformerSynced
143
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
144
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
145

146
        subnetsLister           kubeovnlister.SubnetLister
147
        subnetSynced            cache.InformerSynced
148
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
149
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
150
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
151
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
152
        subnetKeyMutex          keymutex.KeyMutex
153

154
        ippoolLister            kubeovnlister.IPPoolLister
155
        ippoolSynced            cache.InformerSynced
156
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
157
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
158
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
159
        ippoolKeyMutex          keymutex.KeyMutex
160

161
        ipsLister     kubeovnlister.IPLister
162
        ipSynced      cache.InformerSynced
163
        ipIndexer     cache.Indexer
164
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
165
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
166
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
167

168
        virtualIpsLister          kubeovnlister.VipLister
169
        virtualIpsSynced          cache.InformerSynced
170
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
171
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
172
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
173
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
174

175
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
176
        iptablesEipSynced      cache.InformerSynced
177
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
178
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
179
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
180
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
181

182
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
183
        iptablesFipSynced      cache.InformerSynced
184
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
185
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
186
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
187

188
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
189
        iptablesDnatRuleSynced      cache.InformerSynced
190
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
191
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
192
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
193

194
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
195
        iptablesSnatRuleSynced      cache.InformerSynced
196
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
197
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
198
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
199

200
        ovnEipsLister     kubeovnlister.OvnEipLister
201
        ovnEipSynced      cache.InformerSynced
202
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
203
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
204
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
205
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
206

207
        ovnFipsLister     kubeovnlister.OvnFipLister
208
        ovnFipSynced      cache.InformerSynced
209
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
210
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
211
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
212

213
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
214
        ovnSnatRuleSynced      cache.InformerSynced
215
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
216
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
217
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
218

219
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
220
        ovnDnatRuleSynced      cache.InformerSynced
221
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
222
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
223
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
224

225
        providerNetworksLister kubeovnlister.ProviderNetworkLister
226
        providerNetworkSynced  cache.InformerSynced
227

228
        vlansLister     kubeovnlister.VlanLister
229
        vlanSynced      cache.InformerSynced
230
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
231
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
232
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
233
        vlanKeyMutex    keymutex.KeyMutex
234

235
        namespacesLister  v1.NamespaceLister
236
        namespacesSynced  cache.InformerSynced
237
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
238
        nsKeyMutex        keymutex.KeyMutex
239

240
        nodesLister     v1.NodeLister
241
        nodesSynced     cache.InformerSynced
242
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
243
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
244
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
245
        nodeKeyMutex    keymutex.KeyMutex
246

247
        servicesLister     v1.ServiceLister
248
        serviceSynced      cache.InformerSynced
249
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
250
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
251
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
252
        svcKeyMutex        keymutex.KeyMutex
253

254
        endpointSlicesLister          discoveryv1.EndpointSliceLister
255
        endpointSlicesSynced          cache.InformerSynced
256
        epsIndexer                    cache.Indexer
257
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
258
        epKeyMutex                    keymutex.KeyMutex
259

260
        deploymentsLister appsv1.DeploymentLister
261
        deploymentsSynced cache.InformerSynced
262

263
        npsLister     netv1.NetworkPolicyLister
264
        npsSynced     cache.InformerSynced
265
        npIndexer     cache.Indexer
266
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
267
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
268
        npKeyMutex    keymutex.KeyMutex
269

270
        sgsLister          kubeovnlister.SecurityGroupLister
271
        sgSynced           cache.InformerSynced
272
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
273
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
274
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
275
        sgKeyMutex         keymutex.KeyMutex
276

277
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
278
        qosPolicySynced      cache.InformerSynced
279
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
280
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
281
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
282

283
        configMapsLister v1.ConfigMapLister
284
        configMapsSynced cache.InformerSynced
285

286
        anpsLister     anplister.AdminNetworkPolicyLister
287
        anpsSynced     cache.InformerSynced
288
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
289
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
290
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
291
        anpKeyMutex    keymutex.KeyMutex
292

293
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
294
        dnsNameResolverIndexer          cache.Indexer
295
        dnsNameResolversSynced          cache.InformerSynced
296
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
297
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
298

299
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
300
        banpsSynced     cache.InformerSynced
301
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
302
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
303
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
304
        banpKeyMutex    keymutex.KeyMutex
305

306
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
307
        cnpsSynced     cache.InformerSynced
308
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
309
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
310
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
311
        cnpKeyMutex    keymutex.KeyMutex
312

313
        csrLister           certListerv1.CertificateSigningRequestLister
314
        csrSynced           cache.InformerSynced
315
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
316

317
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
318
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
319
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
320

321
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
322
        netAttachSynced          cache.InformerSynced
323
        netAttachInformerFactory netAttach.SharedInformerFactory
324

325
        serviceCIDRStore           *util.ServiceCIDRStore
326
        serviceCIDRLister          netv1.ServiceCIDRLister
327
        serviceCIDRSynced          cache.InformerSynced
328
        serviceCIDRInformerFactory kubeinformers.SharedInformerFactory
329

330
        recorder               record.EventRecorder
331
        informerFactory        kubeinformers.SharedInformerFactory
332
        cmInformerFactory      kubeinformers.SharedInformerFactory
333
        deployInformerFactory  kubeinformers.SharedInformerFactory
334
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
335
        anpInformerFactory     anpinformer.SharedInformerFactory
336

337
        // Database health check
338
        dbFailureCount int
339

340
        distributedSubnetNeedSync atomic.Bool
341
}
342

343
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
344
        if rateLimiter == nil {
2✔
345
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
346
        }
1✔
347
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
348
}
349

350
// Run creates and runs a new ovn controller
351
func Run(ctx context.Context, config *Configuration) {
×
352
        klog.V(4).Info("Creating event broadcaster")
×
353
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
354
        eventBroadcaster.StartLogging(klog.Infof)
×
355
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
356
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
357
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
358
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
359
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
360
        )
×
361

×
362
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
363
        if err != nil {
×
364
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
365
        }
×
366

367
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
368
                kubeinformers.WithTransform(util.TrimPodForController),
×
369
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
370
                        listOption.AllowWatchBookmarks = true
×
371
                }))
×
372
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
373
                kubeinformers.WithNamespace(config.PodNamespace),
×
374
                kubeinformers.WithTransform(util.TrimManagedFields),
×
375
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
376
                        listOption.AllowWatchBookmarks = true
×
377
                }))
×
378
        // deployment informer used to list/watch vpc egress gateway workloads
379
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
380
                kubeinformers.WithTransform(util.TrimManagedFields),
×
381
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
382
                        listOption.AllowWatchBookmarks = true
×
383
                        listOption.LabelSelector = selector.String()
×
384
                }))
×
385
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
386
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
387
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
388
                        listOption.AllowWatchBookmarks = true
×
389
                }))
×
390
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
391
                anpinformer.WithTransform(util.TrimManagedFields),
×
392
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
393
                        listOption.AllowWatchBookmarks = true
×
394
                }))
×
395
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
396
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
397
                        listOption.AllowWatchBookmarks = true
×
398
                }),
×
399
        )
400
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
401
                informer.WithTransform(util.TrimManagedFields),
×
402
        )
×
403
        // Dedicated factory so that on clusters without the ServiceCIDR API the
×
404
        // failed list/watch does not contaminate the main informer factory.
×
405
        serviceCIDRInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
406
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
407
                        listOption.AllowWatchBookmarks = true
×
408
                }),
×
409
        )
410

411
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
412
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
413
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
414
        // BgpConf/EvpnConf informers are started lazily via StartBgpEvpnConfInformerFactory
×
415
        // because their CRDs are optional on clusters that don't use vpc-egress-gateway BGP/EVPN.
×
416
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
417
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
418
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
419
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
420
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
421
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
422
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
423
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
424
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
425
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
426
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
427
        podInformer := informerFactory.Core().V1().Pods()
×
428
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
429
        nodeInformer := informerFactory.Core().V1().Nodes()
×
430
        serviceInformer := informerFactory.Core().V1().Services()
×
431
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
432
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
433
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
434
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
435
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
NEW
436
        routerLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().RouterLBRules()
×
437
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
438
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
439
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
440
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
441
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
442
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
443
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
444
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
445
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
446
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
447
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
448
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
449

×
450
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
451
        controller := &Controller{
×
452
                config:             config,
×
453
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
454
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
455
                ipam:               ovnipam.NewIPAM(),
×
456
                namedPort:          NewNamedPort(),
×
457

×
458
                vpcsLister:           vpcInformer.Lister(),
×
459
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
460
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
461
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
462
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
463
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
464
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
465

×
466
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
467
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
468
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
469
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
470
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
471
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
472
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
473
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
474
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
475
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
476
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
477
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
478
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
479
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
480
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
481
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
482
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
483

×
484
                // bgpConfLister/bgpConfSynced/evpnConfLister/evpnConfSynced are populated lazily
×
485
                // in startBgpEvpnConfInformer once the matching CRDs are detected.
×
486

×
487
                subnetsLister:           subnetInformer.Lister(),
×
488
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
489
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
490
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
491
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
492
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
493
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
494

×
495
                ippoolLister:            ippoolInformer.Lister(),
×
496
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
497
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
498
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
499
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
500
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
501

×
502
                ipsLister:     ipInformer.Lister(),
×
503
                ipSynced:      ipInformer.Informer().HasSynced,
×
504
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
505
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
506
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
507

×
508
                virtualIpsLister:          virtualIPInformer.Lister(),
×
509
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
510
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
511
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
512
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
513
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
514

×
515
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
516
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
517
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
518
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
519
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
520
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
521

×
522
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
523
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
524
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
525
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
526
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
527

×
528
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
529
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
530
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
531
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
532
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
533

×
534
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
535
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
536
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
537
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
538
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
539

×
540
                vlansLister:     vlanInformer.Lister(),
×
541
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
542
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
543
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
544
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
545
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
546

×
547
                providerNetworksLister: providerNetworkInformer.Lister(),
×
548
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
549

×
550
                podsLister:          podInformer.Lister(),
×
551
                podsSynced:          podInformer.Informer().HasSynced,
×
552
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
553
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
554
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
555
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
556
                                Name:          "DeletePod",
×
557
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
558
                        },
×
559
                ),
×
560
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
561
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
562

×
563
                namespacesLister:  namespaceInformer.Lister(),
×
564
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
565
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
566
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
567

×
568
                nodesLister:     nodeInformer.Lister(),
×
569
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
570
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
571
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
572
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
573
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
574

×
575
                servicesLister:     serviceInformer.Lister(),
×
576
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
577
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
578
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
579
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
580
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
581

×
582
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
583
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
584
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
585
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
586

×
587
                deploymentsLister: deploymentInformer.Lister(),
×
588
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
589

×
590
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
591
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
592
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
593
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
594
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
595

×
596
                configMapsLister: configMapInformer.Lister(),
×
597
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
598

×
599
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
600
                sgsLister:          sgInformer.Lister(),
×
601
                sgSynced:           sgInformer.Informer().HasSynced,
×
602
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
603
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
604
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
605

×
606
                ovnEipsLister:     ovnEipInformer.Lister(),
×
607
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
608
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
609
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
610
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
611
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
612

×
613
                ovnFipsLister:     ovnFipInformer.Lister(),
×
614
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
615
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
616
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
617
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
618

×
619
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
620
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
621
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
622
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
623
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
624

×
625
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
626
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
627
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
628
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
629
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
630

×
631
                csrLister:           csrInformer.Lister(),
×
632
                csrSynced:           csrInformer.Informer().HasSynced,
×
633
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
634

×
635
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
636
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
637
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
638

×
639
                netAttachLister:          netAttachInformer.Lister(),
×
640
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
641
                netAttachInformerFactory: attachNetInformerFactory,
×
642

×
643
                serviceCIDRStore:           util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
644
                serviceCIDRInformerFactory: serviceCIDRInformerFactory,
×
645

×
646
                recorder:               recorder,
×
647
                informerFactory:        informerFactory,
×
648
                cmInformerFactory:      cmInformerFactory,
×
649
                deployInformerFactory:  deployInformerFactory,
×
650
                kubeovnInformerFactory: kubeovnInformerFactory,
×
651
                anpInformerFactory:     anpInformerFactory,
×
652
        }
×
653

×
654
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
655
                config.OvnNbAddr,
×
656
                config.OvnTimeout,
×
657
                config.OvsDbConnectTimeout,
×
658
                config.OvsDbInactivityTimeout,
×
659
                config.OvsDbConnectMaxRetry,
×
660
        ); err != nil {
×
661
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
662
        }
×
663
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
664
                config.OvnSbAddr,
×
665
                config.OvnTimeout,
×
666
                config.OvsDbConnectTimeout,
×
667
                config.OvsDbInactivityTimeout,
×
668
                config.OvsDbConnectMaxRetry,
×
669
        ); err != nil {
×
670
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
671
        }
×
672
        if config.EnableLb {
×
NEW
673
                controller.routerLBRuleLister = routerLBRuleInformer.Lister()
×
NEW
674
                controller.routerLBRuleSynced = routerLBRuleInformer.Informer().HasSynced
×
NEW
675
                controller.addRouterLBRuleQueue = newTypedRateLimitingQueue("AddRouterLBRule", custCrdRateLimiter)
×
NEW
676
                controller.delRouterLBRuleQueue = newTypedRateLimitingQueue(
×
NEW
677
                        "DeleteRouterLBRule",
×
NEW
678
                        workqueue.NewTypedMaxOfRateLimiter(
×
NEW
679
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*RouterLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
NEW
680
                                &workqueue.TypedBucketRateLimiter[*RouterLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
NEW
681
                        ),
×
NEW
682
                )
×
NEW
683
                controller.updateRouterLBRuleQueue = newTypedRateLimitingQueue(
×
NEW
684
                        "UpdateRouterLBRule",
×
NEW
685
                        workqueue.NewTypedMaxOfRateLimiter(
×
NEW
686
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*RouterLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
NEW
687
                                &workqueue.TypedBucketRateLimiter[*RouterLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
NEW
688
                        ),
×
NEW
689
                )
×
NEW
690

×
691
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
692
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
693
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
694
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
695
                        "DeleteSwitchLBRule",
×
696
                        workqueue.NewTypedMaxOfRateLimiter(
×
697
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
698
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
699
                        ),
×
700
                )
×
701
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
702
                        "UpdateSwitchLBRule",
×
703
                        workqueue.NewTypedMaxOfRateLimiter(
×
704
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
705
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
706
                        ),
×
707
                )
×
708

×
709
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
710
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
711
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
712
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
713
        }
×
714

715
        if config.EnableNP {
×
716
                controller.npsLister = npInformer.Lister()
×
717
                controller.npsSynced = npInformer.Informer().HasSynced
×
718
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
719
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
720
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
721
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
722
        }
×
723

724
        if config.EnableANP {
×
725
                controller.anpsLister = anpInformer.Lister()
×
726
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
727
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
728
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
729
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
730
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
731

×
732
                controller.banpsLister = banpInformer.Lister()
×
733
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
734
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
735
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
736
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
737
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
738

×
739
                controller.cnpsLister = cnpInformer.Lister()
×
740
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
741
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
742
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
743
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
744
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
745
        }
×
746

747
        if config.EnableDNSNameResolver {
×
748
                if !config.EnableANP {
×
749
                        klog.Warning("DNS name resolver is enabled but ANP support is disabled, DNSNameResolver resources will not take effect")
×
750
                }
×
751
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
752
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
753
                if err := dnsNameResolverInformer.Informer().AddIndexers(cache.Indexers{
×
754
                        IndexDNSNameResolverByName: indexDNSNameResolverByName,
×
755
                }); err != nil {
×
756
                        util.LogFatalAndExit(err, "failed to add DNSNameResolver indexer")
×
757
                }
×
758
                controller.dnsNameResolverIndexer = dnsNameResolverInformer.Informer().GetIndexer()
×
759
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
760
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
761
        }
762

763
        if err := controller.setupIndexers(podInformer.Informer(), endpointSliceInformer.Informer(), ipInformer.Informer()); err != nil {
×
764
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
765
        }
×
766

767
        defer controller.shutdown()
×
768
        klog.Info("Starting OVN controller")
×
769

×
770
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
771
        // NAD CRD is optional, so we check if it exists before starting the informer
×
772
        controller.StartNetAttachInformerFactory(ctx)
×
773

×
774
        // ServiceCIDR (networking.k8s.io/v1) is GA in K8s 1.33; older clusters
×
775
        // don't have the API at all. Best-effort start with periodic retry.
×
776
        controller.StartServiceCIDRInformerFactory(ctx)
×
777

×
778
        // BgpConf/EvpnConf are optional CRDs (v1.16.0+, used by vpc-egress-gateway BGP/EVPN).
×
779
        // They may be missing on clusters upgraded from <v1.16 via Helm, which does not
×
780
        // re-apply the `crds/` directory on `helm upgrade`. Best-effort start with periodic retry.
×
781
        controller.StartBgpEvpnConfInformerFactory(ctx)
×
782

×
783
        // Wait for the caches to be synced before starting workers
×
784
        controller.informerFactory.Start(ctx.Done())
×
785
        controller.cmInformerFactory.Start(ctx.Done())
×
786
        controller.deployInformerFactory.Start(ctx.Done())
×
787
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
788
        controller.anpInformerFactory.Start(ctx.Done())
×
789
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
790

×
791
        klog.Info("Waiting for informer caches to sync")
×
792
        cacheSyncs := []cache.InformerSynced{
×
793
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
794
                controller.vpcSynced, controller.subnetSynced,
×
795
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
796
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
797
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
798
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
799
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
800
                controller.ovnDnatRuleSynced,
×
801
        }
×
802
        if controller.config.EnableLb {
×
NEW
803
                cacheSyncs = append(cacheSyncs, controller.routerLBRuleSynced, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
804
        }
×
805
        if controller.config.EnableNP {
×
806
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
807
        }
×
808
        if controller.config.EnableANP {
×
809
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
810
        }
×
811
        if controller.config.EnableDNSNameResolver {
×
812
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
813
        }
×
814

815
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
816
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
817
        }
×
818

819
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
820
                AddFunc:    controller.enqueueAddPod,
×
821
                DeleteFunc: controller.enqueueDeletePod,
×
822
                UpdateFunc: controller.enqueueUpdatePod,
×
823
        }); err != nil {
×
824
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
825
        }
×
826

827
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
828
                AddFunc:    controller.enqueueAddNamespace,
×
829
                UpdateFunc: controller.enqueueUpdateNamespace,
×
830
                DeleteFunc: controller.enqueueDeleteNamespace,
×
831
        }); err != nil {
×
832
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
833
        }
×
834

835
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
836
                AddFunc:    controller.enqueueAddNode,
×
837
                UpdateFunc: controller.enqueueUpdateNode,
×
838
                DeleteFunc: controller.enqueueDeleteNode,
×
839
        }); err != nil {
×
840
                util.LogFatalAndExit(err, "failed to add node event handler")
×
841
        }
×
842

843
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
844
                AddFunc:    controller.enqueueAddService,
×
845
                DeleteFunc: controller.enqueueDeleteService,
×
846
                UpdateFunc: controller.enqueueUpdateService,
×
847
        }); err != nil {
×
848
                util.LogFatalAndExit(err, "failed to add service event handler")
×
849
        }
×
850

851
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
852
                AddFunc:    controller.enqueueAddEndpointSlice,
×
853
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
854
        }); err != nil {
×
855
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
856
        }
×
857

858
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
859
                AddFunc:    controller.enqueueAddDeployment,
×
860
                UpdateFunc: controller.enqueueUpdateDeployment,
×
861
        }); err != nil {
×
862
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
863
        }
×
864

865
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
866
                AddFunc:    controller.enqueueAddVpc,
×
867
                UpdateFunc: controller.enqueueUpdateVpc,
×
868
                DeleteFunc: controller.enqueueDelVpc,
×
869
        }); err != nil {
×
870
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
871
        }
×
872

873
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
874
                AddFunc:    controller.enqueueAddVpcNatGw,
×
875
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
876
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
877
        }); err != nil {
×
878
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
879
        }
×
880

881
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
882
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
883
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
884
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
885
        }); err != nil {
×
886
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
887
        }
×
888

889
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
890
                AddFunc:    controller.enqueueAddSubnet,
×
891
                UpdateFunc: controller.enqueueUpdateSubnet,
×
892
                DeleteFunc: controller.enqueueDeleteSubnet,
×
893
        }); err != nil {
×
894
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
895
        }
×
896

897
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
898
                AddFunc:    controller.enqueueAddIPPool,
×
899
                UpdateFunc: controller.enqueueUpdateIPPool,
×
900
                DeleteFunc: controller.enqueueDeleteIPPool,
×
901
        }); err != nil {
×
902
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
903
        }
×
904

905
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
906
                AddFunc:    controller.enqueueAddIP,
×
907
                UpdateFunc: controller.enqueueUpdateIP,
×
908
                DeleteFunc: controller.enqueueDelIP,
×
909
        }); err != nil {
×
910
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
911
        }
×
912

913
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
914
                AddFunc:    controller.enqueueAddVlan,
×
915
                DeleteFunc: controller.enqueueDelVlan,
×
916
                UpdateFunc: controller.enqueueUpdateVlan,
×
917
        }); err != nil {
×
918
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
919
        }
×
920

921
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
922
                AddFunc:    controller.enqueueAddSg,
×
923
                DeleteFunc: controller.enqueueDeleteSg,
×
924
                UpdateFunc: controller.enqueueUpdateSg,
×
925
        }); err != nil {
×
926
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
927
        }
×
928

929
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
930
                AddFunc:    controller.enqueueAddVirtualIP,
×
931
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
932
                DeleteFunc: controller.enqueueDelVirtualIP,
×
933
        }); err != nil {
×
934
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
935
        }
×
936

937
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
938
                AddFunc:    controller.enqueueAddIptablesEip,
×
939
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
940
                DeleteFunc: controller.enqueueDelIptablesEip,
×
941
        }); err != nil {
×
942
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
943
        }
×
944

945
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
946
                AddFunc:    controller.enqueueAddIptablesFip,
×
947
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
948
                DeleteFunc: controller.enqueueDelIptablesFip,
×
949
        }); err != nil {
×
950
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
951
        }
×
952

953
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
954
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
955
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
956
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
957
        }); err != nil {
×
958
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
959
        }
×
960

961
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
962
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
963
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
964
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
965
        }); err != nil {
×
966
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
967
        }
×
968

969
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
970
                AddFunc:    controller.enqueueAddOvnEip,
×
971
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
972
                DeleteFunc: controller.enqueueDelOvnEip,
×
973
        }); err != nil {
×
974
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
975
        }
×
976

977
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
978
                AddFunc:    controller.enqueueAddOvnFip,
×
979
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
980
                DeleteFunc: controller.enqueueDelOvnFip,
×
981
        }); err != nil {
×
982
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
983
        }
×
984

985
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
986
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
987
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
988
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
989
        }); err != nil {
×
990
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
991
        }
×
992

993
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
994
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
995
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
996
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
997
        }); err != nil {
×
998
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
999
        }
×
1000

1001
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1002
                AddFunc:    controller.enqueueAddQoSPolicy,
×
1003
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
1004
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
1005
        }); err != nil {
×
1006
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
1007
        }
×
1008

1009
        if config.EnableLb {
×
NEW
1010
                if _, err = routerLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
NEW
1011
                        AddFunc:    controller.enqueueAddRouterLBRule,
×
NEW
1012
                        UpdateFunc: controller.enqueueUpdateRouterLBRule,
×
NEW
1013
                        DeleteFunc: controller.enqueueDeleteRouterLBRule,
×
NEW
1014
                }); err != nil {
×
NEW
1015
                        util.LogFatalAndExit(err, "failed to add router lb rule event handler")
×
NEW
1016
                }
×
1017

1018
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1019
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
1020
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
1021
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
1022
                }); err != nil {
×
1023
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
1024
                }
×
1025

1026
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1027
                        AddFunc:    controller.enqueueAddVpcDNS,
×
1028
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
1029
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
1030
                }); err != nil {
×
1031
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
1032
                }
×
1033
        }
1034

1035
        if config.EnableNP {
×
1036
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1037
                        AddFunc:    controller.enqueueAddNp,
×
1038
                        UpdateFunc: controller.enqueueUpdateNp,
×
1039
                        DeleteFunc: controller.enqueueDeleteNp,
×
1040
                }); err != nil {
×
1041
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
1042
                }
×
1043
        }
1044

1045
        if config.EnableANP {
×
1046
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1047
                        AddFunc:    controller.enqueueAddAnp,
×
1048
                        UpdateFunc: controller.enqueueUpdateAnp,
×
1049
                        DeleteFunc: controller.enqueueDeleteAnp,
×
1050
                }); err != nil {
×
1051
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
1052
                }
×
1053

1054
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1055
                        AddFunc:    controller.enqueueAddBanp,
×
1056
                        UpdateFunc: controller.enqueueUpdateBanp,
×
1057
                        DeleteFunc: controller.enqueueDeleteBanp,
×
1058
                }); err != nil {
×
1059
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
1060
                }
×
1061

1062
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1063
                        AddFunc:    controller.enqueueAddCnp,
×
1064
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1065
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1066
                }); err != nil {
×
1067
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1068
                }
×
1069

1070
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1071
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1072
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1073
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1074
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1075
        }
1076

1077
        if config.EnableDNSNameResolver {
×
1078
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1079
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1080
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1081
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1082
                }); err != nil {
×
1083
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1084
                }
×
1085
        }
1086

1087
        if config.EnableOVNIPSec {
×
1088
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1089
                        AddFunc:    controller.enqueueAddCsr,
×
1090
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1091
                        // no need to add delete func for csr
×
1092
                }); err != nil {
×
1093
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1094
                }
×
1095
        }
1096

1097
        controller.Run(ctx)
×
1098
}
1099

1100
// Run will set up the event handlers for types we are interested in, as well
1101
// as syncing informer caches and starting workers. It will block until stopCh
1102
// is closed, at which point it will shutdown the workqueue and wait for
1103
// workers to finish processing their current work items.
1104
func (c *Controller) Run(ctx context.Context) {
×
1105
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1106
        // Otherwise, the init process should be placed after all workers have already started working
×
1107
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1108
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1109
        }
×
1110

1111
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1112
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1113
        }
×
1114

1115
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1116
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1117
        }
×
1118

1119
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1120
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1121
        }
×
1122

1123
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1124
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1125
        }
×
1126

1127
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1128
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1129
        }
×
1130

1131
        if err := c.InitOVN(); err != nil {
×
1132
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1133
        }
×
1134

1135
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1136
        if err := c.syncIPCR(); err != nil {
×
1137
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1138
        }
×
1139

1140
        if err := c.syncFinalizers(); err != nil {
×
1141
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1142
        }
×
1143

1144
        if err := c.InitIPAM(); err != nil {
×
1145
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1146
        }
×
1147

1148
        if err := c.syncNodeRoutes(); err != nil {
×
1149
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1150
        }
×
1151

1152
        if err := c.syncSubnetCR(); err != nil {
×
1153
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1154
        }
×
1155

1156
        if err := c.syncVlanCR(); err != nil {
×
1157
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1158
        }
×
1159

1160
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1161
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1162
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1163
                }
×
1164
        }
1165

1166
        // start workers to do all the network operations
1167
        c.startWorkers(ctx)
×
1168

×
1169
        c.initResourceOnce()
×
1170
        <-ctx.Done()
×
1171
        klog.Info("Shutting down workers")
×
1172

×
1173
        c.OVNNbClient.Close()
×
1174
        c.OVNSbClient.Close()
×
1175
}
1176

1177
func (c *Controller) dbStatus() {
×
1178
        const maxFailures = 5
×
1179

×
1180
        done := make(chan error, 2)
×
1181
        go func() {
×
1182
                done <- c.OVNNbClient.Echo(context.Background())
×
1183
        }()
×
1184
        go func() {
×
1185
                done <- c.OVNSbClient.Echo(context.Background())
×
1186
        }()
×
1187

1188
        resultsReceived := 0
×
1189
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1190

×
1191
        for resultsReceived < 2 {
×
1192
                select {
×
1193
                case err := <-done:
×
1194
                        resultsReceived++
×
1195
                        if err != nil {
×
1196
                                c.dbFailureCount++
×
1197
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1198
                                if c.dbFailureCount >= maxFailures {
×
1199
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1200
                                }
×
1201
                                return
×
1202
                        }
1203
                case <-timeout:
×
1204
                        c.dbFailureCount++
×
1205
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1206
                        if c.dbFailureCount >= maxFailures {
×
1207
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1208
                        }
×
1209
                        return
×
1210
                }
1211
        }
1212

1213
        if c.dbFailureCount > 0 {
×
1214
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1215
                c.dbFailureCount = 0
×
1216
        }
×
1217
}
1218

1219
func (c *Controller) shutdown() {
×
1220
        utilruntime.HandleCrash()
×
1221

×
1222
        c.addOrUpdatePodQueue.ShutDown()
×
1223
        c.deletePodQueue.ShutDown()
×
1224
        c.updatePodSecurityQueue.ShutDown()
×
1225

×
1226
        c.addNamespaceQueue.ShutDown()
×
1227

×
1228
        c.addOrUpdateSubnetQueue.ShutDown()
×
1229
        c.deleteSubnetQueue.ShutDown()
×
1230
        c.updateSubnetStatusQueue.ShutDown()
×
1231
        c.syncVirtualPortsQueue.ShutDown()
×
1232

×
1233
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1234
        c.updateIPPoolStatusQueue.ShutDown()
×
1235
        c.deleteIPPoolQueue.ShutDown()
×
1236

×
1237
        c.addNodeQueue.ShutDown()
×
1238
        c.updateNodeQueue.ShutDown()
×
1239
        c.deleteNodeQueue.ShutDown()
×
1240

×
1241
        c.addServiceQueue.ShutDown()
×
1242
        c.deleteServiceQueue.ShutDown()
×
1243
        c.updateServiceQueue.ShutDown()
×
1244
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1245

×
1246
        c.addVlanQueue.ShutDown()
×
1247
        c.delVlanQueue.ShutDown()
×
1248
        c.updateVlanQueue.ShutDown()
×
1249

×
1250
        c.addOrUpdateVpcQueue.ShutDown()
×
1251
        c.updateVpcStatusQueue.ShutDown()
×
1252
        c.delVpcQueue.ShutDown()
×
1253

×
1254
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1255
        c.initVpcNatGatewayQueue.ShutDown()
×
1256
        c.delVpcNatGatewayQueue.ShutDown()
×
1257
        c.updateVpcEipQueue.ShutDown()
×
1258
        c.updateVpcFloatingIPQueue.ShutDown()
×
1259
        c.updateVpcDnatQueue.ShutDown()
×
1260
        c.updateVpcSnatQueue.ShutDown()
×
1261
        c.updateVpcSubnetQueue.ShutDown()
×
1262

×
1263
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1264
        c.delVpcEgressGatewayQueue.ShutDown()
×
1265

×
1266
        if c.config.EnableLb {
×
NEW
1267
                c.addRouterLBRuleQueue.ShutDown()
×
NEW
1268
                c.delRouterLBRuleQueue.ShutDown()
×
NEW
1269
                c.updateRouterLBRuleQueue.ShutDown()
×
NEW
1270

×
1271
                c.addSwitchLBRuleQueue.ShutDown()
×
1272
                c.delSwitchLBRuleQueue.ShutDown()
×
1273
                c.updateSwitchLBRuleQueue.ShutDown()
×
1274

×
1275
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1276
                c.delVpcDNSQueue.ShutDown()
×
1277
        }
×
1278

1279
        c.addIPQueue.ShutDown()
×
1280
        c.updateIPQueue.ShutDown()
×
1281
        c.delIPQueue.ShutDown()
×
1282

×
1283
        c.addVirtualIPQueue.ShutDown()
×
1284
        c.updateVirtualIPQueue.ShutDown()
×
1285
        c.updateVirtualParentsQueue.ShutDown()
×
1286
        c.delVirtualIPQueue.ShutDown()
×
1287

×
1288
        c.addIptablesEipQueue.ShutDown()
×
1289
        c.updateIptablesEipQueue.ShutDown()
×
1290
        c.resetIptablesEipQueue.ShutDown()
×
1291
        c.delIptablesEipQueue.ShutDown()
×
1292

×
1293
        c.addIptablesFipQueue.ShutDown()
×
1294
        c.updateIptablesFipQueue.ShutDown()
×
1295
        c.delIptablesFipQueue.ShutDown()
×
1296

×
1297
        c.addIptablesDnatRuleQueue.ShutDown()
×
1298
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1299
        c.delIptablesDnatRuleQueue.ShutDown()
×
1300

×
1301
        c.addIptablesSnatRuleQueue.ShutDown()
×
1302
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1303
        c.delIptablesSnatRuleQueue.ShutDown()
×
1304

×
1305
        c.addQoSPolicyQueue.ShutDown()
×
1306
        c.updateQoSPolicyQueue.ShutDown()
×
1307
        c.delQoSPolicyQueue.ShutDown()
×
1308

×
1309
        c.addOvnEipQueue.ShutDown()
×
1310
        c.updateOvnEipQueue.ShutDown()
×
1311
        c.resetOvnEipQueue.ShutDown()
×
1312
        c.delOvnEipQueue.ShutDown()
×
1313

×
1314
        c.addOvnFipQueue.ShutDown()
×
1315
        c.updateOvnFipQueue.ShutDown()
×
1316
        c.delOvnFipQueue.ShutDown()
×
1317

×
1318
        c.addOvnSnatRuleQueue.ShutDown()
×
1319
        c.updateOvnSnatRuleQueue.ShutDown()
×
1320
        c.delOvnSnatRuleQueue.ShutDown()
×
1321

×
1322
        c.addOvnDnatRuleQueue.ShutDown()
×
1323
        c.updateOvnDnatRuleQueue.ShutDown()
×
1324
        c.delOvnDnatRuleQueue.ShutDown()
×
1325

×
1326
        if c.config.EnableNP {
×
1327
                c.updateNpQueue.ShutDown()
×
1328
                c.deleteNpQueue.ShutDown()
×
1329
        }
×
1330
        if c.config.EnableANP {
×
1331
                c.addAnpQueue.ShutDown()
×
1332
                c.updateAnpQueue.ShutDown()
×
1333
                c.deleteAnpQueue.ShutDown()
×
1334

×
1335
                c.addBanpQueue.ShutDown()
×
1336
                c.updateBanpQueue.ShutDown()
×
1337
                c.deleteBanpQueue.ShutDown()
×
1338

×
1339
                c.addCnpQueue.ShutDown()
×
1340
                c.updateCnpQueue.ShutDown()
×
1341
                c.deleteCnpQueue.ShutDown()
×
1342
        }
×
1343

1344
        if c.config.EnableDNSNameResolver {
×
1345
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1346
                c.deleteDNSNameResolverQueue.ShutDown()
×
1347
        }
×
1348

1349
        c.addOrUpdateSgQueue.ShutDown()
×
1350
        c.delSgQueue.ShutDown()
×
1351
        c.syncSgPortsQueue.ShutDown()
×
1352

×
1353
        c.addOrUpdateCsrQueue.ShutDown()
×
1354

×
1355
        if c.config.EnableLiveMigrationOptimize {
×
1356
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1357
        }
×
1358
}
1359

1360
func (c *Controller) startWorkers(ctx context.Context) {
×
1361
        klog.Info("Starting workers")
×
1362

×
1363
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1364
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1365
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1366

×
1367
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1368
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1369
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1370
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1371
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1372
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1373
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1374
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1375
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1376
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1377
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1378
        // add default and join subnet and wait them ready
×
1379
        for range c.config.WorkerNum {
×
1380
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1381
        }
×
1382
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1383
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1384
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1385
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1386
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1387
                klog.Infof("wait for subnets %v ready", subnets)
×
1388

×
1389
                return c.allSubnetReady(subnets...)
×
1390
        })
×
1391
        if err != nil {
×
1392
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1393
        }
×
1394

1395
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1396
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1397
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1398

×
1399
        // run node worker before handle any pods
×
1400
        for range c.config.WorkerNum {
×
1401
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1402
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1403
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1404
        }
×
1405
        for {
×
1406
                ready := true
×
1407
                time.Sleep(3 * time.Second)
×
1408
                nodes, err := c.nodesLister.List(labels.Everything())
×
1409
                if err != nil {
×
1410
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1411
                }
×
1412
                for _, node := range nodes {
×
1413
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1414
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1415
                                ready = false
×
1416
                                break
×
1417
                        }
1418
                }
1419
                if ready {
×
1420
                        break
×
1421
                }
1422
        }
1423

1424
        if c.config.EnableLb {
×
1425
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1426
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1427
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1428

×
NEW
1429
                go wait.Until(runWorker("add/update router lb rule", c.addRouterLBRuleQueue, c.handleAddOrUpdateRouterLBRule), time.Second, ctx.Done())
×
NEW
1430
                go wait.Until(runWorker("delete router lb rule", c.delRouterLBRuleQueue, c.handleDelRouterLBRule), time.Second, ctx.Done())
×
NEW
1431
                go wait.Until(runWorker("update router lb rule", c.updateRouterLBRuleQueue, c.handleUpdateRouterLBRule), time.Second, ctx.Done())
×
NEW
1432

×
1433
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1434
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1435
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1436

×
1437
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1438
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1439
                go wait.Until(func() {
×
1440
                        c.resyncVpcDNSConfig()
×
1441
                }, 5*time.Second, ctx.Done())
×
1442
        }
1443

1444
        for range c.config.WorkerNum {
×
1445
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1446
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1447
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1448

×
1449
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1450
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1451
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1452
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1453
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1454

×
1455
                if c.config.EnableLb {
×
1456
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1457
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1458
                }
×
1459

1460
                if c.config.EnableNP {
×
1461
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1462
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1463
                }
×
1464

1465
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1466
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1467
        }
1468

1469
        if c.config.EnableEipSnat {
×
1470
                go wait.Until(func() {
×
1471
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1472
                        c.resyncExternalGateway()
×
1473
                }, time.Second, ctx.Done())
×
1474

1475
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1476
                c.OVNNbClient.MonitorBFD()
×
1477
        }
1478
        // TODO: we should merge these two vpc nat config into one config and resync them together
1479
        go wait.Until(func() {
×
1480
                c.resyncVpcNatGwConfig()
×
1481
        }, time.Second, ctx.Done())
×
1482

1483
        go wait.Until(func() {
×
1484
                c.resyncVpcNatConfig()
×
1485
        }, time.Second, ctx.Done())
×
1486

1487
        if c.config.GCInterval != 0 {
×
1488
                go wait.Until(func() {
×
1489
                        if err := c.markAndCleanLSP(); err != nil {
×
1490
                                klog.Errorf("gc lsp error: %v", err)
×
1491
                        }
×
1492
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1493
        }
1494

1495
        go wait.Until(func() {
×
1496
                if err := c.inspectPod(); err != nil {
×
1497
                        klog.Errorf("inspection error: %v", err)
×
1498
                }
×
1499
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1500

1501
        if c.config.EnableExternalVpc {
×
1502
                go wait.Until(func() {
×
1503
                        c.syncExternalVpc()
×
1504
                }, 5*time.Second, ctx.Done())
×
1505
        }
1506

1507
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1508
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1509
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1510
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1511

×
1512
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1513
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1514
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1515
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1516

×
1517
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1518
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1519
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1520

×
1521
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1522
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1523
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1524

×
1525
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1526
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1527
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1528

×
1529
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1530

×
1531
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1532
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1533
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1534

×
1535
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1536
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1537
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1538
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1539

×
1540
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1541
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1542
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1543
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1544

×
1545
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1546
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1547
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1548

×
1549
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1550
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1551
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1552

×
1553
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1554
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1555
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1556

×
1557
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1558
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1559
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1560

×
1561
        if c.config.EnableANP {
×
1562
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1563
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1564
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1565

×
1566
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1567
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1568
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1569

×
1570
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1571
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1572
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1573
        }
×
1574

1575
        if c.config.EnableDNSNameResolver {
×
1576
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1577
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1578
        }
×
1579

1580
        if c.config.EnableLiveMigrationOptimize {
×
1581
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1582
        }
×
1583

1584
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1585

×
1586
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1587
}
1588

1589
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1590
        for _, lsName := range subnets {
2✔
1591
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1592
                if err != nil {
1✔
1593
                        klog.Error(err)
×
1594
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1595
                }
×
1596

1597
                if !exist {
2✔
1598
                        return false, nil
1✔
1599
                }
1✔
1600
        }
1601

1602
        return true, nil
1✔
1603
}
1604

1605
func (c *Controller) initResourceOnce() {
×
1606
        c.registerSubnetMetrics()
×
1607

×
1608
        if err := c.initNodeChassis(); err != nil {
×
1609
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1610
        }
×
1611

1612
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1613
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1614
        }
×
1615
        if err := c.syncSecurityGroup(); err != nil {
×
1616
                util.LogFatalAndExit(err, "failed to sync security group")
×
1617
        }
×
1618

1619
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1620
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1621
        }
×
1622

1623
        if err := c.initVpcNatGw(); err != nil {
×
1624
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1625
        }
×
1626
        if c.config.EnableLb {
×
1627
                if err := c.initVpcDNSConfig(); err != nil {
×
1628
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1629
                }
×
1630
        }
1631

1632
        // remove resources in ovndb that not exist any more in kubernetes resources
1633
        // process gc at last in case of affecting other init process
1634
        if err := c.gc(); err != nil {
×
1635
                util.LogFatalAndExit(err, "failed to run gc")
×
1636
        }
×
1637
}
1638

1639
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1640
        item, shutdown := queue.Get()
×
1641
        if shutdown {
×
1642
                return false
×
1643
        }
×
1644

1645
        err := func(item T) error {
×
1646
                defer queue.Done(item)
×
1647
                if err := handler(item); err != nil {
×
1648
                        queue.AddRateLimited(item)
×
1649
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1650
                }
×
1651
                queue.Forget(item)
×
1652
                return nil
×
1653
        }(item)
1654
        if err != nil {
×
1655
                utilruntime.HandleError(err)
×
1656
                return true
×
1657
        }
×
1658
        return true
×
1659
}
1660

1661
func getWorkItemKey(obj any) string {
×
1662
        switch v := obj.(type) {
×
1663
        case string:
×
1664
                return v
×
1665
        case *vpcService:
×
1666
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1667
        case *AdminNetworkPolicyChangedDelta:
×
1668
                return v.key
×
1669
        case *SwitchLBRuleInfo:
×
1670
                return v.Name
×
1671
        default:
×
1672
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1673
                if err != nil {
×
1674
                        utilruntime.HandleError(err)
×
1675
                        return ""
×
1676
                }
×
1677
                return key
×
1678
        }
1679
}
1680

1681
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1682
        return func() {
×
1683
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1684
                }
×
1685
        }
1686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc