• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21743134951

06 Feb 2026 07:57AM UTC coverage: 22.892% (-0.1%) from 23.008%
21743134951

push

github

web-flow
cni-server: add static fdb entry for subnets with u2o enabled (#6269)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

0 of 297 new or added lines in 10 files covered. (0.0%)

1 existing line in 1 file now uncovered.

12434 of 54316 relevant lines covered (22.89%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.14
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync/atomic"
9
        "time"
10

11
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
12
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
13
        "github.com/puzpuzpuz/xsync/v4"
14
        "golang.org/x/time/rate"
15
        corev1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        "k8s.io/client-go/discovery"
22
        kubeinformers "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        appsv1 "k8s.io/client-go/listers/apps/v1"
26
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
27
        v1 "k8s.io/client-go/listers/core/v1"
28
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
29
        netv1 "k8s.io/client-go/listers/networking/v1"
30
        "k8s.io/client-go/tools/cache"
31
        "k8s.io/client-go/tools/record"
32
        "k8s.io/client-go/util/workqueue"
33
        "k8s.io/klog/v2"
34
        "k8s.io/utils/keymutex"
35
        "k8s.io/utils/set"
36
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
37
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
38
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
39
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
40
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
41

42
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
43
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
44
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
45
        "github.com/kubeovn/kube-ovn/pkg/informer"
46
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
47
        "github.com/kubeovn/kube-ovn/pkg/ovs"
48
        "github.com/kubeovn/kube-ovn/pkg/util"
49
)
50

51
const controllerAgentName = "kube-ovn-controller"
52

53
const (
54
        logicalSwitchKey              = "ls"
55
        logicalRouterKey              = "lr"
56
        portGroupKey                  = "pg"
57
        networkPolicyKey              = "np"
58
        sgKey                         = "sg"
59
        sgsKey                        = "security_groups"
60
        u2oKey                        = "u2o"
61
        adminNetworkPolicyKey         = "anp"
62
        baselineAdminNetworkPolicyKey = "banp"
63
        ippoolKey                     = "ippool"
64
        clusterNetworkPolicyKey       = "cnp"
65
)
66

67
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
68
type Controller struct {
69
        config *Configuration
70

71
        ipam           *ovnipam.IPAM
72
        namedPort      *NamedPort
73
        anpPrioNameMap map[int32]string
74
        anpNamePrioMap map[string]int32
75
        bnpPrioNameMap map[int32]string
76
        bnpNamePrioMap map[string]int32
77

78
        OVNNbClient ovs.NbClient
79
        OVNSbClient ovs.SbClient
80

81
        // ExternalGatewayType define external gateway type, centralized
82
        ExternalGatewayType string
83

84
        podsLister             v1.PodLister
85
        podsSynced             cache.InformerSynced
86
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
87
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
88
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
89
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
90
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
91
        podKeyMutex            keymutex.KeyMutex
92

93
        vpcsLister           kubeovnlister.VpcLister
94
        vpcSynced            cache.InformerSynced
95
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
96
        vpcLastPoliciesMap   *xsync.Map[string, string]
97
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
98
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
99
        vpcKeyMutex          keymutex.KeyMutex
100

101
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
102
        vpcNatGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
106
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
107
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
108
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
109
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
111
        vpcNatGwKeyMutex              keymutex.KeyMutex
112
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
113

114
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
115
        vpcEgressGatewaySynced           cache.InformerSynced
116
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
118
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
119

120
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
121
        switchLBRuleSynced      cache.InformerSynced
122
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
123
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
124
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
125

126
        vpcDNSLister           kubeovnlister.VpcDnsLister
127
        vpcDNSSynced           cache.InformerSynced
128
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
129
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
130

131
        subnetsLister           kubeovnlister.SubnetLister
132
        subnetSynced            cache.InformerSynced
133
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
134
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
135
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
136
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
137
        subnetKeyMutex          keymutex.KeyMutex
138

139
        ippoolLister            kubeovnlister.IPPoolLister
140
        ippoolSynced            cache.InformerSynced
141
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
142
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
143
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
144
        ippoolKeyMutex          keymutex.KeyMutex
145

146
        ipsLister     kubeovnlister.IPLister
147
        ipSynced      cache.InformerSynced
148
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
149
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
150
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
151

152
        virtualIpsLister          kubeovnlister.VipLister
153
        virtualIpsSynced          cache.InformerSynced
154
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
155
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
156
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
157
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
158

159
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
160
        iptablesEipSynced      cache.InformerSynced
161
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
162
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
163
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
164
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
165

166
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
167
        iptablesFipSynced      cache.InformerSynced
168
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
169
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
170
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
171

172
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
173
        iptablesDnatRuleSynced      cache.InformerSynced
174
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
175
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
176
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
177

178
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
179
        iptablesSnatRuleSynced      cache.InformerSynced
180
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
181
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
182
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
183

184
        ovnEipsLister     kubeovnlister.OvnEipLister
185
        ovnEipSynced      cache.InformerSynced
186
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
187
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
188
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
189
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
190

191
        ovnFipsLister     kubeovnlister.OvnFipLister
192
        ovnFipSynced      cache.InformerSynced
193
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
194
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
195
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
196

197
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
198
        ovnSnatRuleSynced      cache.InformerSynced
199
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
200
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
201
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
202

203
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
204
        ovnDnatRuleSynced      cache.InformerSynced
205
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
206
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
207
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
208

209
        providerNetworksLister kubeovnlister.ProviderNetworkLister
210
        providerNetworkSynced  cache.InformerSynced
211

212
        vlansLister     kubeovnlister.VlanLister
213
        vlanSynced      cache.InformerSynced
214
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
215
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
216
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
217
        vlanKeyMutex    keymutex.KeyMutex
218

219
        namespacesLister  v1.NamespaceLister
220
        namespacesSynced  cache.InformerSynced
221
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
222
        nsKeyMutex        keymutex.KeyMutex
223

224
        nodesLister     v1.NodeLister
225
        nodesSynced     cache.InformerSynced
226
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
227
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
228
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
229
        nodeKeyMutex    keymutex.KeyMutex
230

231
        servicesLister     v1.ServiceLister
232
        serviceSynced      cache.InformerSynced
233
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
234
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
235
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
236
        svcKeyMutex        keymutex.KeyMutex
237

238
        endpointSlicesLister          discoveryv1.EndpointSliceLister
239
        endpointSlicesSynced          cache.InformerSynced
240
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
241
        epKeyMutex                    keymutex.KeyMutex
242

243
        deploymentsLister appsv1.DeploymentLister
244
        deploymentsSynced cache.InformerSynced
245

246
        npsLister     netv1.NetworkPolicyLister
247
        npsSynced     cache.InformerSynced
248
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
249
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
250
        npKeyMutex    keymutex.KeyMutex
251

252
        sgsLister          kubeovnlister.SecurityGroupLister
253
        sgSynced           cache.InformerSynced
254
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
255
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
256
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
257
        sgKeyMutex         keymutex.KeyMutex
258

259
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
260
        qosPolicySynced      cache.InformerSynced
261
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
262
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
263
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
264

265
        configMapsLister v1.ConfigMapLister
266
        configMapsSynced cache.InformerSynced
267

268
        anpsLister     anplister.AdminNetworkPolicyLister
269
        anpsSynced     cache.InformerSynced
270
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
271
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
272
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
273
        anpKeyMutex    keymutex.KeyMutex
274

275
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
276
        dnsNameResolversSynced          cache.InformerSynced
277
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
278
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
279

280
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
281
        banpsSynced     cache.InformerSynced
282
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
283
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
284
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
285
        banpKeyMutex    keymutex.KeyMutex
286

287
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
288
        cnpsSynced     cache.InformerSynced
289
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
290
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
291
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
292
        cnpKeyMutex    keymutex.KeyMutex
293

294
        csrLister           certListerv1.CertificateSigningRequestLister
295
        csrSynced           cache.InformerSynced
296
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
297

298
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
299
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
300
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
301

302
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
303
        netAttachSynced          cache.InformerSynced
304
        netAttachInformerFactory netAttach.SharedInformerFactory
305

306
        recorder               record.EventRecorder
307
        informerFactory        kubeinformers.SharedInformerFactory
308
        cmInformerFactory      kubeinformers.SharedInformerFactory
309
        deployInformerFactory  kubeinformers.SharedInformerFactory
310
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
311
        anpInformerFactory     anpinformer.SharedInformerFactory
312

313
        // Database health check
314
        dbFailureCount int
315

316
        distributedSubnetNeedSync atomic.Bool
317
}
318

319
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
320
        if rateLimiter == nil {
2✔
321
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
322
        }
1✔
323
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
324
}
325

326
// Run creates and runs a new ovn controller
327
func Run(ctx context.Context, config *Configuration) {
×
328
        klog.V(4).Info("Creating event broadcaster")
×
329
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
330
        eventBroadcaster.StartLogging(klog.Infof)
×
331
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
332
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
333
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
334
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
335
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
336
        )
×
337

×
338
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
339
        if err != nil {
×
340
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
341
        }
×
342

343
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
344
                kubeinformers.WithTransform(util.TrimManagedFields),
×
345
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
346
                        listOption.AllowWatchBookmarks = true
×
347
                }))
×
348
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
349
                kubeinformers.WithNamespace(config.PodNamespace),
×
350
                kubeinformers.WithTransform(util.TrimManagedFields),
×
351
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
352
                        listOption.AllowWatchBookmarks = true
×
353
                }))
×
354
        // deployment informer used to list/watch vpc egress gateway workloads
355
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
356
                kubeinformers.WithTransform(util.TrimManagedFields),
×
357
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
358
                        listOption.AllowWatchBookmarks = true
×
359
                        listOption.LabelSelector = selector.String()
×
360
                }))
×
361
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
362
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
363
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
364
                        listOption.AllowWatchBookmarks = true
×
365
                }))
×
366
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
367
                anpinformer.WithTransform(util.TrimManagedFields),
×
368
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
369
                        listOption.AllowWatchBookmarks = true
×
370
                }))
×
371
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
372
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
373
                        listOption.AllowWatchBookmarks = true
×
374
                }),
×
375
        )
376
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
377
                informer.WithTransform(util.TrimManagedFields),
×
378
        )
×
379

×
380
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
381
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
382
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
383
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
384
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
385
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
386
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
387
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
388
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
389
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
390
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
391
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
392
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
393
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
394
        podInformer := informerFactory.Core().V1().Pods()
×
395
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
396
        nodeInformer := informerFactory.Core().V1().Nodes()
×
397
        serviceInformer := informerFactory.Core().V1().Services()
×
398
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
399
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
400
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
401
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
402
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
403
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
404
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
405
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
406
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
407
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
408
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
409
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
410
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
411
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
412
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
413
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
414
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
415

×
416
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
417
        controller := &Controller{
×
418
                config:             config,
×
419
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
420
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
421
                ipam:               ovnipam.NewIPAM(),
×
422
                namedPort:          NewNamedPort(),
×
423

×
424
                vpcsLister:           vpcInformer.Lister(),
×
425
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
426
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
427
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
428
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
429
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
430
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
431

×
432
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
433
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
434
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
435
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
436
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
437
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
438
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
439
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
440
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
441
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
442
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
443
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
444
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
445
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
446
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
447
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
448
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
449

×
450
                subnetsLister:           subnetInformer.Lister(),
×
451
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
452
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
453
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
454
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
455
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
456
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
457

×
458
                ippoolLister:            ippoolInformer.Lister(),
×
459
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
460
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
461
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
462
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
463
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
464

×
465
                ipsLister:     ipInformer.Lister(),
×
466
                ipSynced:      ipInformer.Informer().HasSynced,
×
467
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
468
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
469
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
470

×
471
                virtualIpsLister:          virtualIPInformer.Lister(),
×
472
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
473
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
474
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
475
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
476
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
477

×
478
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
479
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
480
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
481
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
482
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
483
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
484

×
485
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
486
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
487
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
488
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
489
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
490

×
491
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
492
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
493
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
494
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
495
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
496

×
497
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
498
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
499
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
500
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
501
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
502

×
503
                vlansLister:     vlanInformer.Lister(),
×
504
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
505
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
506
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
507
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
508
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
509

×
510
                providerNetworksLister: providerNetworkInformer.Lister(),
×
511
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
512

×
513
                podsLister:          podInformer.Lister(),
×
514
                podsSynced:          podInformer.Informer().HasSynced,
×
515
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
516
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
517
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
518
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
519
                                Name:          "DeletePod",
×
520
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
521
                        },
×
522
                ),
×
523
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
524
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
525

×
526
                namespacesLister:  namespaceInformer.Lister(),
×
527
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
528
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
529
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
530

×
531
                nodesLister:     nodeInformer.Lister(),
×
532
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
533
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
534
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
535
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
536
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
537

×
538
                servicesLister:     serviceInformer.Lister(),
×
539
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
540
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
541
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
542
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
543
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
544

×
545
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
546
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
547
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
548
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
549

×
550
                deploymentsLister: deploymentInformer.Lister(),
×
551
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
552

×
553
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
554
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
555
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
556
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
557
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
558

×
559
                configMapsLister: configMapInformer.Lister(),
×
560
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
561

×
562
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
563
                sgsLister:          sgInformer.Lister(),
×
564
                sgSynced:           sgInformer.Informer().HasSynced,
×
565
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
566
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
567
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
568

×
569
                ovnEipsLister:     ovnEipInformer.Lister(),
×
570
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
571
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
572
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
573
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
574
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
575

×
576
                ovnFipsLister:     ovnFipInformer.Lister(),
×
577
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
578
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
579
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
580
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
581

×
582
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
583
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
584
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
585
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
586
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
587

×
588
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
589
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
590
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
591
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
592
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
593

×
594
                csrLister:           csrInformer.Lister(),
×
595
                csrSynced:           csrInformer.Informer().HasSynced,
×
596
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
597

×
598
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
599
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
600
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
601

×
602
                netAttachLister:          netAttachInformer.Lister(),
×
603
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
604
                netAttachInformerFactory: attachNetInformerFactory,
×
605

×
606
                recorder:               recorder,
×
607
                informerFactory:        informerFactory,
×
608
                cmInformerFactory:      cmInformerFactory,
×
609
                deployInformerFactory:  deployInformerFactory,
×
610
                kubeovnInformerFactory: kubeovnInformerFactory,
×
611
                anpInformerFactory:     anpInformerFactory,
×
612
        }
×
613

×
614
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
615
                config.OvnNbAddr,
×
616
                config.OvnTimeout,
×
617
                config.OvsDbConnectTimeout,
×
618
                config.OvsDbInactivityTimeout,
×
619
                config.OvsDbConnectMaxRetry,
×
620
        ); err != nil {
×
621
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
622
        }
×
623
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
624
                config.OvnSbAddr,
×
625
                config.OvnTimeout,
×
626
                config.OvsDbConnectTimeout,
×
627
                config.OvsDbInactivityTimeout,
×
628
                config.OvsDbConnectMaxRetry,
×
629
        ); err != nil {
×
630
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
631
        }
×
632
        if config.EnableLb {
×
633
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
634
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
635
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
636
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
637
                        "DeleteSwitchLBRule",
×
638
                        workqueue.NewTypedMaxOfRateLimiter(
×
639
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
640
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
641
                        ),
×
642
                )
×
643
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
644
                        "UpdateSwitchLBRule",
×
645
                        workqueue.NewTypedMaxOfRateLimiter(
×
646
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
647
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
648
                        ),
×
649
                )
×
650

×
651
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
652
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
653
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
654
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
655
        }
×
656

657
        if config.EnableNP {
×
658
                controller.npsLister = npInformer.Lister()
×
659
                controller.npsSynced = npInformer.Informer().HasSynced
×
660
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
661
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
662
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
663
        }
×
664

665
        if config.EnableANP {
×
666
                controller.anpsLister = anpInformer.Lister()
×
667
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
668
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
669
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
670
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
671
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
672

×
673
                controller.banpsLister = banpInformer.Lister()
×
674
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
675
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
676
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
677
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
678
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
679

×
680
                controller.cnpsLister = cnpInformer.Lister()
×
681
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
682
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
683
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
684
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
685
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
686
        }
×
687

688
        if config.EnableDNSNameResolver {
×
689
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
690
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
691
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
692
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
693
        }
×
694

695
        defer controller.shutdown()
×
696
        klog.Info("Starting OVN controller")
×
697

×
698
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
699
        // NAD CRD is optional, so we check if it exists before starting the informer
×
700
        controller.StartNetAttachInformerFactory(ctx)
×
701

×
702
        // Wait for the caches to be synced before starting workers
×
703
        controller.informerFactory.Start(ctx.Done())
×
704
        controller.cmInformerFactory.Start(ctx.Done())
×
705
        controller.deployInformerFactory.Start(ctx.Done())
×
706
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
707
        controller.anpInformerFactory.Start(ctx.Done())
×
708
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
709

×
710
        klog.Info("Waiting for informer caches to sync")
×
711
        cacheSyncs := []cache.InformerSynced{
×
712
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
713
                controller.vpcSynced, controller.subnetSynced,
×
714
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
715
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
716
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
717
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
718
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
719
                controller.ovnDnatRuleSynced,
×
720
        }
×
721
        if controller.config.EnableLb {
×
722
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
723
        }
×
724
        if controller.config.EnableNP {
×
725
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
726
        }
×
727
        if controller.config.EnableANP {
×
728
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
729
        }
×
730
        if controller.config.EnableDNSNameResolver {
×
731
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
732
        }
×
733

734
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
735
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
736
        }
×
737

738
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
739
                AddFunc:    controller.enqueueAddPod,
×
740
                DeleteFunc: controller.enqueueDeletePod,
×
741
                UpdateFunc: controller.enqueueUpdatePod,
×
742
        }); err != nil {
×
743
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
744
        }
×
745

746
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
747
                AddFunc:    controller.enqueueAddNamespace,
×
748
                UpdateFunc: controller.enqueueUpdateNamespace,
×
749
                DeleteFunc: controller.enqueueDeleteNamespace,
×
750
        }); err != nil {
×
751
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
752
        }
×
753

754
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
755
                AddFunc:    controller.enqueueAddNode,
×
756
                UpdateFunc: controller.enqueueUpdateNode,
×
757
                DeleteFunc: controller.enqueueDeleteNode,
×
758
        }); err != nil {
×
759
                util.LogFatalAndExit(err, "failed to add node event handler")
×
760
        }
×
761

762
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
763
                AddFunc:    controller.enqueueAddService,
×
764
                DeleteFunc: controller.enqueueDeleteService,
×
765
                UpdateFunc: controller.enqueueUpdateService,
×
766
        }); err != nil {
×
767
                util.LogFatalAndExit(err, "failed to add service event handler")
×
768
        }
×
769

770
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
771
                AddFunc:    controller.enqueueAddEndpointSlice,
×
772
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
773
        }); err != nil {
×
774
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
775
        }
×
776

777
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
778
                AddFunc:    controller.enqueueAddDeployment,
×
779
                UpdateFunc: controller.enqueueUpdateDeployment,
×
780
        }); err != nil {
×
781
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
782
        }
×
783

784
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
785
                AddFunc:    controller.enqueueAddVpc,
×
786
                UpdateFunc: controller.enqueueUpdateVpc,
×
787
                DeleteFunc: controller.enqueueDelVpc,
×
788
        }); err != nil {
×
789
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
790
        }
×
791

792
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
793
                AddFunc:    controller.enqueueAddVpcNatGw,
×
794
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
795
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
796
        }); err != nil {
×
797
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
798
        }
×
799

800
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
801
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
802
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
803
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
804
        }); err != nil {
×
805
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
806
        }
×
807

808
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
809
                AddFunc:    controller.enqueueAddSubnet,
×
810
                UpdateFunc: controller.enqueueUpdateSubnet,
×
811
                DeleteFunc: controller.enqueueDeleteSubnet,
×
812
        }); err != nil {
×
813
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
814
        }
×
815

816
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
817
                AddFunc:    controller.enqueueAddIPPool,
×
818
                UpdateFunc: controller.enqueueUpdateIPPool,
×
819
                DeleteFunc: controller.enqueueDeleteIPPool,
×
820
        }); err != nil {
×
821
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
822
        }
×
823

824
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
825
                AddFunc:    controller.enqueueAddIP,
×
826
                UpdateFunc: controller.enqueueUpdateIP,
×
827
                DeleteFunc: controller.enqueueDelIP,
×
828
        }); err != nil {
×
829
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
830
        }
×
831

832
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
833
                AddFunc:    controller.enqueueAddVlan,
×
834
                DeleteFunc: controller.enqueueDelVlan,
×
835
                UpdateFunc: controller.enqueueUpdateVlan,
×
836
        }); err != nil {
×
837
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
838
        }
×
839

840
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
841
                AddFunc:    controller.enqueueAddSg,
×
842
                DeleteFunc: controller.enqueueDeleteSg,
×
843
                UpdateFunc: controller.enqueueUpdateSg,
×
844
        }); err != nil {
×
845
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
846
        }
×
847

848
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
849
                AddFunc:    controller.enqueueAddVirtualIP,
×
850
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
851
                DeleteFunc: controller.enqueueDelVirtualIP,
×
852
        }); err != nil {
×
853
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
854
        }
×
855

856
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
857
                AddFunc:    controller.enqueueAddIptablesEip,
×
858
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
859
                DeleteFunc: controller.enqueueDelIptablesEip,
×
860
        }); err != nil {
×
861
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
862
        }
×
863

864
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
865
                AddFunc:    controller.enqueueAddIptablesFip,
×
866
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
867
                DeleteFunc: controller.enqueueDelIptablesFip,
×
868
        }); err != nil {
×
869
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
870
        }
×
871

872
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
873
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
874
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
875
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
876
        }); err != nil {
×
877
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
878
        }
×
879

880
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
882
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
883
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
884
        }); err != nil {
×
885
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
886
        }
×
887

888
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
889
                AddFunc:    controller.enqueueAddOvnEip,
×
890
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
891
                DeleteFunc: controller.enqueueDelOvnEip,
×
892
        }); err != nil {
×
893
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
894
        }
×
895

896
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
897
                AddFunc:    controller.enqueueAddOvnFip,
×
898
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
899
                DeleteFunc: controller.enqueueDelOvnFip,
×
900
        }); err != nil {
×
901
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
902
        }
×
903

904
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
906
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
907
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
908
        }); err != nil {
×
909
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
910
        }
×
911

912
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
913
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
914
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
915
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
916
        }); err != nil {
×
917
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
918
        }
×
919

920
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
921
                AddFunc:    controller.enqueueAddQoSPolicy,
×
922
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
923
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
924
        }); err != nil {
×
925
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
926
        }
×
927

928
        if config.EnableLb {
×
929
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
930
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
931
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
932
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
933
                }); err != nil {
×
934
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
935
                }
×
936

937
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
938
                        AddFunc:    controller.enqueueAddVpcDNS,
×
939
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
940
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
941
                }); err != nil {
×
942
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
943
                }
×
944
        }
945

946
        if config.EnableNP {
×
947
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
948
                        AddFunc:    controller.enqueueAddNp,
×
949
                        UpdateFunc: controller.enqueueUpdateNp,
×
950
                        DeleteFunc: controller.enqueueDeleteNp,
×
951
                }); err != nil {
×
952
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
953
                }
×
954
        }
955

956
        if config.EnableANP {
×
957
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
958
                        AddFunc:    controller.enqueueAddAnp,
×
959
                        UpdateFunc: controller.enqueueUpdateAnp,
×
960
                        DeleteFunc: controller.enqueueDeleteAnp,
×
961
                }); err != nil {
×
962
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
963
                }
×
964

965
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
966
                        AddFunc:    controller.enqueueAddBanp,
×
967
                        UpdateFunc: controller.enqueueUpdateBanp,
×
968
                        DeleteFunc: controller.enqueueDeleteBanp,
×
969
                }); err != nil {
×
970
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
971
                }
×
972

973
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
974
                        AddFunc:    controller.enqueueAddCnp,
×
975
                        UpdateFunc: controller.enqueueUpdateCnp,
×
976
                        DeleteFunc: controller.enqueueDeleteCnp,
×
977
                }); err != nil {
×
978
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
979
                }
×
980

981
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
982
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
983
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
984
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
985
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
986
        }
987

988
        if config.EnableDNSNameResolver {
×
989
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
990
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
991
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
992
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
993
                }); err != nil {
×
994
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
995
                }
×
996
        }
997

998
        if config.EnableOVNIPSec {
×
999
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1000
                        AddFunc:    controller.enqueueAddCsr,
×
1001
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1002
                        // no need to add delete func for csr
×
1003
                }); err != nil {
×
1004
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1005
                }
×
1006
        }
1007

1008
        controller.Run(ctx)
×
1009
}
1010

1011
// Run will set up the event handlers for types we are interested in, as well
1012
// as syncing informer caches and starting workers. It will block until stopCh
1013
// is closed, at which point it will shutdown the workqueue and wait for
1014
// workers to finish processing their current work items.
1015
func (c *Controller) Run(ctx context.Context) {
×
1016
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1017
        // Otherwise, the init process should be placed after all workers have already started working
×
1018
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1019
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1020
        }
×
1021

1022
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1023
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1024
        }
×
1025

1026
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1027
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1028
        }
×
1029

1030
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1031
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1032
        }
×
1033

1034
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1035
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1036
        }
×
1037

1038
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1039
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1040
        }
×
1041

1042
        if err := c.InitOVN(); err != nil {
×
1043
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1044
        }
×
1045

1046
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1047
        if err := c.syncIPCR(); err != nil {
×
1048
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1049
        }
×
1050

1051
        if err := c.syncFinalizers(); err != nil {
×
1052
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1053
        }
×
1054

1055
        if err := c.InitIPAM(); err != nil {
×
1056
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1057
        }
×
1058

1059
        if err := c.syncNodeRoutes(); err != nil {
×
1060
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1061
        }
×
1062

1063
        if err := c.syncSubnetCR(); err != nil {
×
1064
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1065
        }
×
1066

1067
        if err := c.syncVlanCR(); err != nil {
×
1068
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1069
        }
×
1070

1071
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1072
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1073
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1074
                }
×
1075
        }
1076

1077
        // start workers to do all the network operations
1078
        c.startWorkers(ctx)
×
1079

×
1080
        c.initResourceOnce()
×
1081
        <-ctx.Done()
×
1082
        klog.Info("Shutting down workers")
×
NEW
1083

×
NEW
1084
        c.OVNNbClient.Close()
×
NEW
1085
        c.OVNSbClient.Close()
×
1086
}
1087

1088
func (c *Controller) dbStatus() {
×
1089
        const maxFailures = 5
×
1090

×
1091
        done := make(chan error, 2)
×
1092
        go func() {
×
1093
                done <- c.OVNNbClient.Echo(context.Background())
×
1094
        }()
×
1095
        go func() {
×
1096
                done <- c.OVNSbClient.Echo(context.Background())
×
1097
        }()
×
1098

1099
        resultsReceived := 0
×
1100
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1101

×
1102
        for resultsReceived < 2 {
×
1103
                select {
×
1104
                case err := <-done:
×
1105
                        resultsReceived++
×
1106
                        if err != nil {
×
1107
                                c.dbFailureCount++
×
1108
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1109
                                if c.dbFailureCount >= maxFailures {
×
1110
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1111
                                }
×
1112
                                return
×
1113
                        }
1114
                case <-timeout:
×
1115
                        c.dbFailureCount++
×
1116
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1117
                        if c.dbFailureCount >= maxFailures {
×
1118
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1119
                        }
×
1120
                        return
×
1121
                }
1122
        }
1123

1124
        if c.dbFailureCount > 0 {
×
1125
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1126
                c.dbFailureCount = 0
×
1127
        }
×
1128
}
1129

1130
func (c *Controller) shutdown() {
×
1131
        utilruntime.HandleCrash()
×
1132

×
1133
        c.addOrUpdatePodQueue.ShutDown()
×
1134
        c.deletePodQueue.ShutDown()
×
1135
        c.updatePodSecurityQueue.ShutDown()
×
1136

×
1137
        c.addNamespaceQueue.ShutDown()
×
1138

×
1139
        c.addOrUpdateSubnetQueue.ShutDown()
×
1140
        c.deleteSubnetQueue.ShutDown()
×
1141
        c.updateSubnetStatusQueue.ShutDown()
×
1142
        c.syncVirtualPortsQueue.ShutDown()
×
1143

×
1144
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1145
        c.updateIPPoolStatusQueue.ShutDown()
×
1146
        c.deleteIPPoolQueue.ShutDown()
×
1147

×
1148
        c.addNodeQueue.ShutDown()
×
1149
        c.updateNodeQueue.ShutDown()
×
1150
        c.deleteNodeQueue.ShutDown()
×
1151

×
1152
        c.addServiceQueue.ShutDown()
×
1153
        c.deleteServiceQueue.ShutDown()
×
1154
        c.updateServiceQueue.ShutDown()
×
1155
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1156

×
1157
        c.addVlanQueue.ShutDown()
×
1158
        c.delVlanQueue.ShutDown()
×
1159
        c.updateVlanQueue.ShutDown()
×
1160

×
1161
        c.addOrUpdateVpcQueue.ShutDown()
×
1162
        c.updateVpcStatusQueue.ShutDown()
×
1163
        c.delVpcQueue.ShutDown()
×
1164

×
1165
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1166
        c.initVpcNatGatewayQueue.ShutDown()
×
1167
        c.delVpcNatGatewayQueue.ShutDown()
×
1168
        c.updateVpcEipQueue.ShutDown()
×
1169
        c.updateVpcFloatingIPQueue.ShutDown()
×
1170
        c.updateVpcDnatQueue.ShutDown()
×
1171
        c.updateVpcSnatQueue.ShutDown()
×
1172
        c.updateVpcSubnetQueue.ShutDown()
×
1173

×
1174
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1175
        c.delVpcEgressGatewayQueue.ShutDown()
×
1176

×
1177
        if c.config.EnableLb {
×
1178
                c.addSwitchLBRuleQueue.ShutDown()
×
1179
                c.delSwitchLBRuleQueue.ShutDown()
×
1180
                c.updateSwitchLBRuleQueue.ShutDown()
×
1181

×
1182
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1183
                c.delVpcDNSQueue.ShutDown()
×
1184
        }
×
1185

1186
        c.addIPQueue.ShutDown()
×
1187
        c.updateIPQueue.ShutDown()
×
1188
        c.delIPQueue.ShutDown()
×
1189

×
1190
        c.addVirtualIPQueue.ShutDown()
×
1191
        c.updateVirtualIPQueue.ShutDown()
×
1192
        c.updateVirtualParentsQueue.ShutDown()
×
1193
        c.delVirtualIPQueue.ShutDown()
×
1194

×
1195
        c.addIptablesEipQueue.ShutDown()
×
1196
        c.updateIptablesEipQueue.ShutDown()
×
1197
        c.resetIptablesEipQueue.ShutDown()
×
1198
        c.delIptablesEipQueue.ShutDown()
×
1199

×
1200
        c.addIptablesFipQueue.ShutDown()
×
1201
        c.updateIptablesFipQueue.ShutDown()
×
1202
        c.delIptablesFipQueue.ShutDown()
×
1203

×
1204
        c.addIptablesDnatRuleQueue.ShutDown()
×
1205
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1206
        c.delIptablesDnatRuleQueue.ShutDown()
×
1207

×
1208
        c.addIptablesSnatRuleQueue.ShutDown()
×
1209
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1210
        c.delIptablesSnatRuleQueue.ShutDown()
×
1211

×
1212
        c.addQoSPolicyQueue.ShutDown()
×
1213
        c.updateQoSPolicyQueue.ShutDown()
×
1214
        c.delQoSPolicyQueue.ShutDown()
×
1215

×
1216
        c.addOvnEipQueue.ShutDown()
×
1217
        c.updateOvnEipQueue.ShutDown()
×
1218
        c.resetOvnEipQueue.ShutDown()
×
1219
        c.delOvnEipQueue.ShutDown()
×
1220

×
1221
        c.addOvnFipQueue.ShutDown()
×
1222
        c.updateOvnFipQueue.ShutDown()
×
1223
        c.delOvnFipQueue.ShutDown()
×
1224

×
1225
        c.addOvnSnatRuleQueue.ShutDown()
×
1226
        c.updateOvnSnatRuleQueue.ShutDown()
×
1227
        c.delOvnSnatRuleQueue.ShutDown()
×
1228

×
1229
        c.addOvnDnatRuleQueue.ShutDown()
×
1230
        c.updateOvnDnatRuleQueue.ShutDown()
×
1231
        c.delOvnDnatRuleQueue.ShutDown()
×
1232

×
1233
        if c.config.EnableNP {
×
1234
                c.updateNpQueue.ShutDown()
×
1235
                c.deleteNpQueue.ShutDown()
×
1236
        }
×
1237
        if c.config.EnableANP {
×
1238
                c.addAnpQueue.ShutDown()
×
1239
                c.updateAnpQueue.ShutDown()
×
1240
                c.deleteAnpQueue.ShutDown()
×
1241

×
1242
                c.addBanpQueue.ShutDown()
×
1243
                c.updateBanpQueue.ShutDown()
×
1244
                c.deleteBanpQueue.ShutDown()
×
1245

×
1246
                c.addCnpQueue.ShutDown()
×
1247
                c.updateCnpQueue.ShutDown()
×
1248
                c.deleteCnpQueue.ShutDown()
×
1249
        }
×
1250

1251
        if c.config.EnableDNSNameResolver {
×
1252
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1253
                c.deleteDNSNameResolverQueue.ShutDown()
×
1254
        }
×
1255

1256
        c.addOrUpdateSgQueue.ShutDown()
×
1257
        c.delSgQueue.ShutDown()
×
1258
        c.syncSgPortsQueue.ShutDown()
×
1259

×
1260
        c.addOrUpdateCsrQueue.ShutDown()
×
1261

×
1262
        if c.config.EnableLiveMigrationOptimize {
×
1263
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1264
        }
×
1265
}
1266

1267
func (c *Controller) startWorkers(ctx context.Context) {
×
1268
        klog.Info("Starting workers")
×
1269

×
1270
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1272
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1273

×
1274
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1275
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1276
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1277
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1278
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1279
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1280
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1281
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1282
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1283
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1284
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1285
        // add default and join subnet and wait them ready
×
1286
        for range c.config.WorkerNum {
×
1287
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1288
        }
×
1289
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1290
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1291
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1292
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1293
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1294
                klog.Infof("wait for subnets %v ready", subnets)
×
1295

×
1296
                return c.allSubnetReady(subnets...)
×
1297
        })
×
1298
        if err != nil {
×
1299
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1300
        }
×
1301

1302
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1303
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1304
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1305

×
1306
        // run node worker before handle any pods
×
1307
        for range c.config.WorkerNum {
×
1308
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1309
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1310
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1311
        }
×
1312
        for {
×
1313
                ready := true
×
1314
                time.Sleep(3 * time.Second)
×
1315
                nodes, err := c.nodesLister.List(labels.Everything())
×
1316
                if err != nil {
×
1317
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1318
                }
×
1319
                for _, node := range nodes {
×
1320
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1321
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1322
                                ready = false
×
1323
                                break
×
1324
                        }
1325
                }
1326
                if ready {
×
1327
                        break
×
1328
                }
1329
        }
1330

1331
        if c.config.EnableLb {
×
1332
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1333
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1334
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1335

×
1336
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1337
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1338
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1339

×
1340
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1341
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1342
                go wait.Until(func() {
×
1343
                        c.resyncVpcDNSConfig()
×
1344
                }, 5*time.Second, ctx.Done())
×
1345
        }
1346

1347
        for range c.config.WorkerNum {
×
1348
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1349
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1350
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1351

×
1352
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1353
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1354
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1355
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1356
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1357

×
1358
                if c.config.EnableLb {
×
1359
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1360
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1361
                }
×
1362

1363
                if c.config.EnableNP {
×
1364
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1365
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1366
                }
×
1367

1368
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1369
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1370
        }
1371

1372
        if c.config.EnableEipSnat {
×
1373
                go wait.Until(func() {
×
1374
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1375
                        c.resyncExternalGateway()
×
1376
                }, time.Second, ctx.Done())
×
1377

1378
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1379
                c.OVNNbClient.MonitorBFD()
×
1380
        }
1381
        // TODO: we should merge these two vpc nat config into one config and resync them together
1382
        go wait.Until(func() {
×
1383
                c.resyncVpcNatGwConfig()
×
1384
        }, time.Second, ctx.Done())
×
1385

1386
        go wait.Until(func() {
×
1387
                c.resyncVpcNatConfig()
×
1388
        }, time.Second, ctx.Done())
×
1389

1390
        if c.config.GCInterval != 0 {
×
1391
                go wait.Until(func() {
×
1392
                        if err := c.markAndCleanLSP(); err != nil {
×
1393
                                klog.Errorf("gc lsp error: %v", err)
×
1394
                        }
×
1395
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1396
        }
1397

1398
        go wait.Until(func() {
×
1399
                if err := c.inspectPod(); err != nil {
×
1400
                        klog.Errorf("inspection error: %v", err)
×
1401
                }
×
1402
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1403

1404
        if c.config.EnableExternalVpc {
×
1405
                go wait.Until(func() {
×
1406
                        c.syncExternalVpc()
×
1407
                }, 5*time.Second, ctx.Done())
×
1408
        }
1409

1410
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1411
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1412
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1413
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1414

×
1415
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1416
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1417
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1418
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1419

×
1420
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1421
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1422
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1423

×
1424
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1425
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1426
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1427

×
1428
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1429
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1430
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1431

×
1432
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1433

×
1434
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1435
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1436
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1437

×
1438
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1439
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1441
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1442

×
1443
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1444
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1445
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1446
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1447

×
1448
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1449
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1450
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1451

×
1452
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1453
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1454
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1455

×
1456
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1457
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1458
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1459

×
1460
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1461
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1462
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1463

×
1464
        if c.config.EnableANP {
×
1465
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1466
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1467
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1468

×
1469
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1470
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1471
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1472

×
1473
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1474
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1475
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1476
        }
×
1477

1478
        if c.config.EnableDNSNameResolver {
×
1479
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1480
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1481
        }
×
1482

1483
        if c.config.EnableLiveMigrationOptimize {
×
1484
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1485
        }
×
1486

1487
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1488

×
1489
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1490
}
1491

1492
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1493
        for _, lsName := range subnets {
2✔
1494
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1495
                if err != nil {
1✔
1496
                        klog.Error(err)
×
1497
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1498
                }
×
1499

1500
                if !exist {
2✔
1501
                        return false, nil
1✔
1502
                }
1✔
1503
        }
1504

1505
        return true, nil
1✔
1506
}
1507

1508
func (c *Controller) initResourceOnce() {
×
1509
        c.registerSubnetMetrics()
×
1510

×
1511
        if err := c.initNodeChassis(); err != nil {
×
1512
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1513
        }
×
1514

1515
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1516
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1517
        }
×
1518
        if err := c.syncSecurityGroup(); err != nil {
×
1519
                util.LogFatalAndExit(err, "failed to sync security group")
×
1520
        }
×
1521

1522
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1523
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1524
        }
×
1525

1526
        if err := c.initVpcNatGw(); err != nil {
×
1527
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1528
        }
×
1529
        if c.config.EnableLb {
×
1530
                if err := c.initVpcDNSConfig(); err != nil {
×
1531
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1532
                }
×
1533
        }
1534

1535
        // remove resources in ovndb that not exist any more in kubernetes resources
1536
        // process gc at last in case of affecting other init process
1537
        if err := c.gc(); err != nil {
×
1538
                util.LogFatalAndExit(err, "failed to run gc")
×
1539
        }
×
1540
}
1541

1542
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1543
        item, shutdown := queue.Get()
×
1544
        if shutdown {
×
1545
                return false
×
1546
        }
×
1547

1548
        err := func(item T) error {
×
1549
                defer queue.Done(item)
×
1550
                if err := handler(item); err != nil {
×
1551
                        queue.AddRateLimited(item)
×
1552
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1553
                }
×
1554
                queue.Forget(item)
×
1555
                return nil
×
1556
        }(item)
1557
        if err != nil {
×
1558
                utilruntime.HandleError(err)
×
1559
                return true
×
1560
        }
×
1561
        return true
×
1562
}
1563

1564
func getWorkItemKey(obj any) string {
×
1565
        switch v := obj.(type) {
×
1566
        case string:
×
1567
                return v
×
1568
        case *vpcService:
×
1569
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1570
        case *AdminNetworkPolicyChangedDelta:
×
1571
                return v.key
×
1572
        case *SlrInfo:
×
1573
                return v.Name
×
1574
        default:
×
1575
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1576
                if err != nil {
×
1577
                        utilruntime.HandleError(err)
×
1578
                        return ""
×
1579
                }
×
1580
                return key
×
1581
        }
1582
}
1583

1584
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1585
        return func() {
×
1586
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1587
                }
×
1588
        }
1589
}
1590

1591
// apiResourceExists checks if all specified kinds exist in the given group version.
1592
// It returns true if all kinds are found, false otherwise.
1593
// Parameters:
1594
// - discoveryClient: The discovery client to use for querying API resources.
1595
// - gv: The group version string (e.g., "apps/v1").
1596
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
1597
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
1598
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
1599
        if err != nil {
×
1600
                if k8serrors.IsNotFound(err) {
×
1601
                        return false, nil
×
1602
                }
×
1603
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1604
        }
1605

1606
        existingKinds := set.New[string]()
×
1607
        for _, apiResource := range apiResourceLists.APIResources {
×
1608
                existingKinds.Insert(apiResource.Kind)
×
1609
        }
×
1610

1611
        return existingKinds.HasAll(kinds...), nil
×
1612
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc