• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 20331782627

18 Dec 2025 09:10AM UTC coverage: 22.603% (-0.06%) from 22.661%
20331782627

push

github

web-flow
Add tests for VIP finalizer handling and subnet status updates (#6068)

* Add tests for VIP finalizer handling and subnet status updates

- Introduced a new test to verify that the subnet status is correctly updated when a VIP is created and deleted, ensuring that finalizers are properly handled.
- Added checks for both IPv4 and IPv6 protocols, including dual stack scenarios, to confirm that available and using IP counts and ranges are updated as expected.
- Enhanced the existing VIP creation test to wait for the finalizer to be added before proceeding with subnet status verification.
- Updated sleep durations to ensure sufficient time for status updates after VIP operations.

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>

* fix after review

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>

---------

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>

0 of 312 new or added lines in 10 files covered. (0.0%)

21 existing lines in 6 files now uncovered.

12052 of 53320 relevant lines covered (22.6%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.16
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync/atomic"
9
        "time"
10

11
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
12
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
13
        "github.com/puzpuzpuz/xsync/v4"
14
        "golang.org/x/time/rate"
15
        corev1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/labels"
18
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
19
        "k8s.io/apimachinery/pkg/util/wait"
20
        kubeinformers "k8s.io/client-go/informers"
21
        "k8s.io/client-go/kubernetes/scheme"
22
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
23
        appsv1 "k8s.io/client-go/listers/apps/v1"
24
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
25
        v1 "k8s.io/client-go/listers/core/v1"
26
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
27
        netv1 "k8s.io/client-go/listers/networking/v1"
28
        "k8s.io/client-go/tools/cache"
29
        "k8s.io/client-go/tools/record"
30
        "k8s.io/client-go/util/workqueue"
31
        "k8s.io/klog/v2"
32
        "k8s.io/utils/keymutex"
33
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
34
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
35
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
36
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
37
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
38

39
        "github.com/kubeovn/kube-ovn/pkg/informer"
40

41
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
42
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
43
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
44
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
45
        "github.com/kubeovn/kube-ovn/pkg/ovs"
46
        "github.com/kubeovn/kube-ovn/pkg/util"
47
)
48

49
const controllerAgentName = "kube-ovn-controller"
50

51
const (
52
        logicalSwitchKey              = "ls"
53
        logicalRouterKey              = "lr"
54
        portGroupKey                  = "pg"
55
        networkPolicyKey              = "np"
56
        sgKey                         = "sg"
57
        sgsKey                        = "security_groups"
58
        u2oKey                        = "u2o"
59
        adminNetworkPolicyKey         = "anp"
60
        baselineAdminNetworkPolicyKey = "banp"
61
        ippoolKey                     = "ippool"
62
        clusterNetworkPolicyKey       = "cnp"
63
)
64

65
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
66
type Controller struct {
67
        config *Configuration
68

69
        ipam           *ovnipam.IPAM
70
        namedPort      *NamedPort
71
        anpPrioNameMap map[int32]string
72
        anpNamePrioMap map[string]int32
73
        bnpPrioNameMap map[int32]string
74
        bnpNamePrioMap map[string]int32
75

76
        OVNNbClient ovs.NbClient
77
        OVNSbClient ovs.SbClient
78

79
        // ExternalGatewayType define external gateway type, centralized
80
        ExternalGatewayType string
81

82
        podsLister             v1.PodLister
83
        podsSynced             cache.InformerSynced
84
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
85
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
86
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
87
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
88
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
89
        podKeyMutex            keymutex.KeyMutex
90

91
        vpcsLister           kubeovnlister.VpcLister
92
        vpcSynced            cache.InformerSynced
93
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
94
        vpcLastPoliciesMap   *xsync.Map[string, string]
95
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
96
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
97
        vpcKeyMutex          keymutex.KeyMutex
98

99
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
100
        vpcNatGatewaySynced           cache.InformerSynced
101
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
102
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
103
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
104
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
105
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
106
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
107
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
108
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
109
        vpcNatGwKeyMutex              keymutex.KeyMutex
110
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
111

112
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
113
        vpcEgressGatewaySynced           cache.InformerSynced
114
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
115
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
116
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
117

118
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
119
        switchLBRuleSynced      cache.InformerSynced
120
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
121
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
122
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
123

124
        vpcDNSLister           kubeovnlister.VpcDnsLister
125
        vpcDNSSynced           cache.InformerSynced
126
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
127
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
128

129
        subnetsLister           kubeovnlister.SubnetLister
130
        subnetSynced            cache.InformerSynced
131
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
132
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
133
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
134
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
135
        subnetKeyMutex          keymutex.KeyMutex
136

137
        ippoolLister            kubeovnlister.IPPoolLister
138
        ippoolSynced            cache.InformerSynced
139
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
140
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
141
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
142
        ippoolKeyMutex          keymutex.KeyMutex
143

144
        ipsLister     kubeovnlister.IPLister
145
        ipSynced      cache.InformerSynced
146
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
147
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
148
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
149

150
        virtualIpsLister          kubeovnlister.VipLister
151
        virtualIpsSynced          cache.InformerSynced
152
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
153
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
154
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
155
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
156

157
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
158
        iptablesEipSynced      cache.InformerSynced
159
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
160
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
161
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
162
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
163

164
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
165
        iptablesFipSynced      cache.InformerSynced
166
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
167
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
168
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
169

170
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
171
        iptablesDnatRuleSynced      cache.InformerSynced
172
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
173
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
174
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
175

176
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
177
        iptablesSnatRuleSynced      cache.InformerSynced
178
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
179
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
180
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
181

182
        ovnEipsLister     kubeovnlister.OvnEipLister
183
        ovnEipSynced      cache.InformerSynced
184
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
185
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
186
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
187
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
188

189
        ovnFipsLister     kubeovnlister.OvnFipLister
190
        ovnFipSynced      cache.InformerSynced
191
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
193
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
194

195
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
196
        ovnSnatRuleSynced      cache.InformerSynced
197
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
198
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
199
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
200

201
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
202
        ovnDnatRuleSynced      cache.InformerSynced
203
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
204
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
205
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
206

207
        providerNetworksLister kubeovnlister.ProviderNetworkLister
208
        providerNetworkSynced  cache.InformerSynced
209

210
        vlansLister     kubeovnlister.VlanLister
211
        vlanSynced      cache.InformerSynced
212
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
213
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
214
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
215
        vlanKeyMutex    keymutex.KeyMutex
216

217
        namespacesLister  v1.NamespaceLister
218
        namespacesSynced  cache.InformerSynced
219
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
220
        nsKeyMutex        keymutex.KeyMutex
221

222
        nodesLister     v1.NodeLister
223
        nodesSynced     cache.InformerSynced
224
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
225
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
226
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
227
        nodeKeyMutex    keymutex.KeyMutex
228

229
        servicesLister     v1.ServiceLister
230
        serviceSynced      cache.InformerSynced
231
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
232
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
233
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
234
        svcKeyMutex        keymutex.KeyMutex
235

236
        endpointSlicesLister          discoveryv1.EndpointSliceLister
237
        endpointSlicesSynced          cache.InformerSynced
238
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
239
        epKeyMutex                    keymutex.KeyMutex
240

241
        deploymentsLister appsv1.DeploymentLister
242
        deploymentsSynced cache.InformerSynced
243

244
        npsLister     netv1.NetworkPolicyLister
245
        npsSynced     cache.InformerSynced
246
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
247
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
248
        npKeyMutex    keymutex.KeyMutex
249

250
        sgsLister          kubeovnlister.SecurityGroupLister
251
        sgSynced           cache.InformerSynced
252
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
253
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
254
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
255
        sgKeyMutex         keymutex.KeyMutex
256

257
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
258
        qosPolicySynced      cache.InformerSynced
259
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
260
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
261
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
262

263
        configMapsLister v1.ConfigMapLister
264
        configMapsSynced cache.InformerSynced
265

266
        anpsLister     anplister.AdminNetworkPolicyLister
267
        anpsSynced     cache.InformerSynced
268
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
269
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
270
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
271
        anpKeyMutex    keymutex.KeyMutex
272

273
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
274
        dnsNameResolversSynced          cache.InformerSynced
275
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
276
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
277

278
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
279
        banpsSynced     cache.InformerSynced
280
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
281
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
282
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
283
        banpKeyMutex    keymutex.KeyMutex
284

285
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
286
        cnpsSynced     cache.InformerSynced
287
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
288
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
289
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
290
        cnpKeyMutex    keymutex.KeyMutex
291

292
        csrLister           certListerv1.CertificateSigningRequestLister
293
        csrSynced           cache.InformerSynced
294
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
295

296
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
297
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
298
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
299

300
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
301
        netAttachSynced          cache.InformerSynced
302
        netAttachInformerFactory netAttach.SharedInformerFactory
303

304
        recorder               record.EventRecorder
305
        informerFactory        kubeinformers.SharedInformerFactory
306
        cmInformerFactory      kubeinformers.SharedInformerFactory
307
        deployInformerFactory  kubeinformers.SharedInformerFactory
308
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
309
        anpInformerFactory     anpinformer.SharedInformerFactory
310

311
        // Database health check
312
        dbFailureCount int
313

314
        distributedSubnetNeedSync atomic.Bool
315
}
316

317
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
318
        if rateLimiter == nil {
2✔
319
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
320
        }
1✔
321
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
322
}
323

324
// Run creates and runs a new ovn controller
325
func Run(ctx context.Context, config *Configuration) {
×
326
        klog.V(4).Info("Creating event broadcaster")
×
327
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
328
        eventBroadcaster.StartLogging(klog.Infof)
×
329
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
330
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
331
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
332
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
333
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
334
        )
×
335

×
336
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
337
        if err != nil {
×
338
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
339
        }
×
340

341
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
342
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
343
                        listOption.AllowWatchBookmarks = true
×
344
                }))
×
345
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
346
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
347
                        listOption.AllowWatchBookmarks = true
×
348
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
349
        // deployment informer used to list/watch vpc egress gateway workloads
350
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
351
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
352
                        listOption.AllowWatchBookmarks = true
×
353
                        listOption.LabelSelector = selector.String()
×
354
                }))
×
355
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
356
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
357
                        listOption.AllowWatchBookmarks = true
×
358
                }))
×
359
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
360
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
361
                        listOption.AllowWatchBookmarks = true
×
362
                }))
×
363

364
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
365
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
366
                        listOption.AllowWatchBookmarks = true
×
367
                }),
×
368
        )
369

370
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
371

×
372
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
373
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
374
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
375
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
376
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
377
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
378
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
379
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
380
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
381
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
382
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
383
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
384
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
385
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
386
        podInformer := informerFactory.Core().V1().Pods()
×
387
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
388
        nodeInformer := informerFactory.Core().V1().Nodes()
×
389
        serviceInformer := informerFactory.Core().V1().Services()
×
390
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
391
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
392
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
393
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
394
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
395
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
396
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
397
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
398
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
399
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
400
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
401
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
402
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
403
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
404
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
405
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
406
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
407

×
408
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
409
        controller := &Controller{
×
410
                config:             config,
×
411
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
412
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
413
                ipam:               ovnipam.NewIPAM(),
×
414
                namedPort:          NewNamedPort(),
×
415

×
416
                vpcsLister:           vpcInformer.Lister(),
×
417
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
418
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
419
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
420
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
421
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
422
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
423

×
424
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
425
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
426
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
427
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
428
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
429
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
430
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
431
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
432
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
433
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
434
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
435
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
436
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
437
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
438
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
439
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
440
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
441

×
442
                subnetsLister:           subnetInformer.Lister(),
×
443
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
444
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
445
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
446
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
447
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
448
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
449

×
450
                ippoolLister:            ippoolInformer.Lister(),
×
451
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
452
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
453
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
454
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
455
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
456

×
457
                ipsLister:     ipInformer.Lister(),
×
458
                ipSynced:      ipInformer.Informer().HasSynced,
×
459
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
460
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
461
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
462

×
463
                virtualIpsLister:          virtualIPInformer.Lister(),
×
464
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
465
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
466
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
467
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
468
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
469

×
470
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
471
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
472
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
473
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
474
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
NEW
475
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
476

×
477
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
478
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
479
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
480
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
481
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
482

×
483
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
484
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
485
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
486
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
487
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
488

×
489
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
490
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
491
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
492
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
493
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
494

×
495
                vlansLister:     vlanInformer.Lister(),
×
496
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
497
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
498
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
499
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
500
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
501

×
502
                providerNetworksLister: providerNetworkInformer.Lister(),
×
503
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
504

×
505
                podsLister:          podInformer.Lister(),
×
506
                podsSynced:          podInformer.Informer().HasSynced,
×
507
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
508
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
509
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
510
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
511
                                Name:          "DeletePod",
×
512
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
513
                        },
×
514
                ),
×
515
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
516
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
517

×
518
                namespacesLister:  namespaceInformer.Lister(),
×
519
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
520
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
521
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
522

×
523
                nodesLister:     nodeInformer.Lister(),
×
524
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
525
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
526
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
527
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
528
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
529

×
530
                servicesLister:     serviceInformer.Lister(),
×
531
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
532
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
533
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
534
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
535
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
536

×
537
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
538
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
539
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
540
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
541

×
542
                deploymentsLister: deploymentInformer.Lister(),
×
543
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
544

×
545
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
546
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
547
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
548
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
549
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
550

×
551
                configMapsLister: configMapInformer.Lister(),
×
552
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
553

×
554
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
555
                sgsLister:          sgInformer.Lister(),
×
556
                sgSynced:           sgInformer.Informer().HasSynced,
×
557
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
558
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
559
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
560

×
561
                ovnEipsLister:     ovnEipInformer.Lister(),
×
562
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
563
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
564
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
565
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
NEW
566
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
567

×
568
                ovnFipsLister:     ovnFipInformer.Lister(),
×
569
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
570
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
571
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
572
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
573

×
574
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
575
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
576
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
577
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
578
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
579

×
580
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
581
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
582
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
583
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
584
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
585

×
586
                csrLister:           csrInformer.Lister(),
×
587
                csrSynced:           csrInformer.Informer().HasSynced,
×
588
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
589

×
590
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
591
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
592
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
593

×
594
                netAttachLister:          netAttachInformer.Lister(),
×
595
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
596
                netAttachInformerFactory: attachNetInformerFactory,
×
597

×
598
                recorder:               recorder,
×
599
                informerFactory:        informerFactory,
×
600
                cmInformerFactory:      cmInformerFactory,
×
601
                deployInformerFactory:  deployInformerFactory,
×
602
                kubeovnInformerFactory: kubeovnInformerFactory,
×
603
                anpInformerFactory:     anpInformerFactory,
×
604
        }
×
605

×
606
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
607
                config.OvnNbAddr,
×
608
                config.OvnTimeout,
×
609
                config.OvsDbConnectTimeout,
×
610
                config.OvsDbInactivityTimeout,
×
611
                config.OvsDbConnectMaxRetry,
×
612
        ); err != nil {
×
613
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
614
        }
×
615
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
616
                config.OvnSbAddr,
×
617
                config.OvnTimeout,
×
618
                config.OvsDbConnectTimeout,
×
619
                config.OvsDbInactivityTimeout,
×
620
                config.OvsDbConnectMaxRetry,
×
621
        ); err != nil {
×
622
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
623
        }
×
624
        if config.EnableLb {
×
625
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
626
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
627
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
628
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
629
                        "DeleteSwitchLBRule",
×
630
                        workqueue.NewTypedMaxOfRateLimiter(
×
631
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
632
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
633
                        ),
×
634
                )
×
635
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
636
                        "UpdateSwitchLBRule",
×
637
                        workqueue.NewTypedMaxOfRateLimiter(
×
638
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
639
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
640
                        ),
×
641
                )
×
642

×
643
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
644
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
645
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
646
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
647
        }
×
648

649
        if config.EnableNP {
×
650
                controller.npsLister = npInformer.Lister()
×
651
                controller.npsSynced = npInformer.Informer().HasSynced
×
652
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
653
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
654
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
655
        }
×
656

657
        if config.EnableANP {
×
658
                controller.anpsLister = anpInformer.Lister()
×
659
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
660
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
661
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
662
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
663
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
664

×
665
                controller.banpsLister = banpInformer.Lister()
×
666
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
667
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
668
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
669
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
670
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
671

×
672
                controller.cnpsLister = cnpInformer.Lister()
×
673
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
674
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
675
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
676
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
677
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
678
        }
×
679

680
        if config.EnableDNSNameResolver {
×
681
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
682
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
683
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
684
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
685
        }
×
686

687
        defer controller.shutdown()
×
688
        klog.Info("Starting OVN controller")
×
689

×
690
        // Wait for the caches to be synced before starting workers
×
691
        controller.informerFactory.Start(ctx.Done())
×
692
        controller.cmInformerFactory.Start(ctx.Done())
×
693
        controller.deployInformerFactory.Start(ctx.Done())
×
694
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
695
        controller.anpInformerFactory.Start(ctx.Done())
×
696
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
697
        controller.StartNetAttachInformerFactory(ctx)
×
698

×
699
        klog.Info("Waiting for informer caches to sync")
×
700
        cacheSyncs := []cache.InformerSynced{
×
701
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
702
                controller.vpcSynced, controller.subnetSynced,
×
703
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
704
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
705
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
706
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
707
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
708
                controller.ovnDnatRuleSynced,
×
709
        }
×
710
        if controller.config.EnableLb {
×
711
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
712
        }
×
713
        if controller.config.EnableNP {
×
714
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
715
        }
×
716
        if controller.config.EnableANP {
×
717
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
718
        }
×
719
        if controller.config.EnableDNSNameResolver {
×
720
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
721
        }
×
722

723
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
724
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
725
        }
×
726

727
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
728
                AddFunc:    controller.enqueueAddPod,
×
729
                DeleteFunc: controller.enqueueDeletePod,
×
730
                UpdateFunc: controller.enqueueUpdatePod,
×
731
        }); err != nil {
×
732
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
733
        }
×
734

735
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
736
                AddFunc:    controller.enqueueAddNamespace,
×
737
                UpdateFunc: controller.enqueueUpdateNamespace,
×
738
                DeleteFunc: controller.enqueueDeleteNamespace,
×
739
        }); err != nil {
×
740
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
741
        }
×
742

743
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
744
                AddFunc:    controller.enqueueAddNode,
×
745
                UpdateFunc: controller.enqueueUpdateNode,
×
746
                DeleteFunc: controller.enqueueDeleteNode,
×
747
        }); err != nil {
×
748
                util.LogFatalAndExit(err, "failed to add node event handler")
×
749
        }
×
750

751
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
752
                AddFunc:    controller.enqueueAddService,
×
753
                DeleteFunc: controller.enqueueDeleteService,
×
754
                UpdateFunc: controller.enqueueUpdateService,
×
755
        }); err != nil {
×
756
                util.LogFatalAndExit(err, "failed to add service event handler")
×
757
        }
×
758

759
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
760
                AddFunc:    controller.enqueueAddEndpointSlice,
×
761
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
762
        }); err != nil {
×
763
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
764
        }
×
765

766
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
767
                AddFunc:    controller.enqueueAddDeployment,
×
768
                UpdateFunc: controller.enqueueUpdateDeployment,
×
769
        }); err != nil {
×
770
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
771
        }
×
772

773
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
774
                AddFunc:    controller.enqueueAddVpc,
×
775
                UpdateFunc: controller.enqueueUpdateVpc,
×
776
                DeleteFunc: controller.enqueueDelVpc,
×
777
        }); err != nil {
×
778
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
779
        }
×
780

781
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
782
                AddFunc:    controller.enqueueAddVpcNatGw,
×
783
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
784
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
785
        }); err != nil {
×
786
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
787
        }
×
788

789
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
790
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
791
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
792
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
793
        }); err != nil {
×
794
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
795
        }
×
796

797
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
798
                AddFunc:    controller.enqueueAddSubnet,
×
799
                UpdateFunc: controller.enqueueUpdateSubnet,
×
800
                DeleteFunc: controller.enqueueDeleteSubnet,
×
801
        }); err != nil {
×
802
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
803
        }
×
804

805
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
806
                AddFunc:    controller.enqueueAddIPPool,
×
807
                UpdateFunc: controller.enqueueUpdateIPPool,
×
808
                DeleteFunc: controller.enqueueDeleteIPPool,
×
809
        }); err != nil {
×
810
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
811
        }
×
812

813
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
814
                AddFunc:    controller.enqueueAddIP,
×
815
                UpdateFunc: controller.enqueueUpdateIP,
×
816
                DeleteFunc: controller.enqueueDelIP,
×
817
        }); err != nil {
×
818
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
819
        }
×
820

821
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
822
                AddFunc:    controller.enqueueAddVlan,
×
823
                DeleteFunc: controller.enqueueDelVlan,
×
824
                UpdateFunc: controller.enqueueUpdateVlan,
×
825
        }); err != nil {
×
826
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
827
        }
×
828

829
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
830
                AddFunc:    controller.enqueueAddSg,
×
831
                DeleteFunc: controller.enqueueDeleteSg,
×
832
                UpdateFunc: controller.enqueueUpdateSg,
×
833
        }); err != nil {
×
834
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
835
        }
×
836

837
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
838
                AddFunc:    controller.enqueueAddVirtualIP,
×
839
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
840
                DeleteFunc: controller.enqueueDelVirtualIP,
×
841
        }); err != nil {
×
842
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
843
        }
×
844

845
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
846
                AddFunc:    controller.enqueueAddIptablesEip,
×
847
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
848
                DeleteFunc: controller.enqueueDelIptablesEip,
×
849
        }); err != nil {
×
850
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
851
        }
×
852

853
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
854
                AddFunc:    controller.enqueueAddIptablesFip,
×
855
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
856
                DeleteFunc: controller.enqueueDelIptablesFip,
×
857
        }); err != nil {
×
858
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
859
        }
×
860

861
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
862
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
863
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
864
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
865
        }); err != nil {
×
866
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
867
        }
×
868

869
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
870
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
871
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
872
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
873
        }); err != nil {
×
874
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
875
        }
×
876

877
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
878
                AddFunc:    controller.enqueueAddOvnEip,
×
879
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
880
                DeleteFunc: controller.enqueueDelOvnEip,
×
881
        }); err != nil {
×
882
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
883
        }
×
884

885
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
886
                AddFunc:    controller.enqueueAddOvnFip,
×
887
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
888
                DeleteFunc: controller.enqueueDelOvnFip,
×
889
        }); err != nil {
×
890
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
891
        }
×
892

893
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
894
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
895
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
896
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
897
        }); err != nil {
×
898
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
899
        }
×
900

901
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
902
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
903
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
904
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
905
        }); err != nil {
×
906
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
907
        }
×
908

909
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
910
                AddFunc:    controller.enqueueAddQoSPolicy,
×
911
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
912
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
913
        }); err != nil {
×
914
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
915
        }
×
916

917
        if config.EnableLb {
×
918
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
919
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
920
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
921
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
922
                }); err != nil {
×
923
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
924
                }
×
925

926
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
927
                        AddFunc:    controller.enqueueAddVpcDNS,
×
928
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
929
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
930
                }); err != nil {
×
931
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
932
                }
×
933
        }
934

935
        if config.EnableNP {
×
936
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
937
                        AddFunc:    controller.enqueueAddNp,
×
938
                        UpdateFunc: controller.enqueueUpdateNp,
×
939
                        DeleteFunc: controller.enqueueDeleteNp,
×
940
                }); err != nil {
×
941
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
942
                }
×
943
        }
944

945
        if config.EnableANP {
×
946
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
947
                        AddFunc:    controller.enqueueAddAnp,
×
948
                        UpdateFunc: controller.enqueueUpdateAnp,
×
949
                        DeleteFunc: controller.enqueueDeleteAnp,
×
950
                }); err != nil {
×
951
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
952
                }
×
953

954
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
955
                        AddFunc:    controller.enqueueAddBanp,
×
956
                        UpdateFunc: controller.enqueueUpdateBanp,
×
957
                        DeleteFunc: controller.enqueueDeleteBanp,
×
958
                }); err != nil {
×
959
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
960
                }
×
961

962
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
963
                        AddFunc:    controller.enqueueAddCnp,
×
964
                        UpdateFunc: controller.enqueueUpdateCnp,
×
965
                        DeleteFunc: controller.enqueueDeleteCnp,
×
966
                }); err != nil {
×
967
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
968
                }
×
969

970
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
971
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
972
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
973
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
974
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
975
        }
976

977
        if config.EnableDNSNameResolver {
×
978
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
979
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
980
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
981
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
982
                }); err != nil {
×
983
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
984
                }
×
985
        }
986

987
        if config.EnableOVNIPSec {
×
988
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
989
                        AddFunc:    controller.enqueueAddCsr,
×
990
                        UpdateFunc: controller.enqueueUpdateCsr,
×
991
                        // no need to add delete func for csr
×
992
                }); err != nil {
×
993
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
994
                }
×
995
        }
996

997
        controller.Run(ctx)
×
998
}
999

1000
// Run will set up the event handlers for types we are interested in, as well
1001
// as syncing informer caches and starting workers. It will block until stopCh
1002
// is closed, at which point it will shutdown the workqueue and wait for
1003
// workers to finish processing their current work items.
1004
func (c *Controller) Run(ctx context.Context) {
×
1005
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1006
        // Otherwise, the init process should be placed after all workers have already started working
×
1007
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1008
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1009
        }
×
1010

1011
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1012
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1013
        }
×
1014

1015
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1016
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1017
        }
×
1018

1019
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1020
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1021
        }
×
1022

1023
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1024
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1025
        }
×
1026

1027
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1028
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1029
        }
×
1030

1031
        if err := c.InitOVN(); err != nil {
×
1032
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1033
        }
×
1034

1035
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1036
        if err := c.syncIPCR(); err != nil {
×
1037
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1038
        }
×
1039

1040
        if err := c.syncFinalizers(); err != nil {
×
1041
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1042
        }
×
1043

1044
        if err := c.InitIPAM(); err != nil {
×
1045
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1046
        }
×
1047

1048
        if err := c.syncNodeRoutes(); err != nil {
×
1049
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1050
        }
×
1051

1052
        if err := c.syncSubnetCR(); err != nil {
×
1053
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1054
        }
×
1055

1056
        if err := c.syncVlanCR(); err != nil {
×
1057
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1058
        }
×
1059

1060
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1061
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1062
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1063
                }
×
1064
        }
1065

1066
        // start workers to do all the network operations
1067
        c.startWorkers(ctx)
×
1068

×
1069
        c.initResourceOnce()
×
1070
        <-ctx.Done()
×
1071
        klog.Info("Shutting down workers")
×
1072
}
1073

1074
func (c *Controller) dbStatus() {
×
1075
        const maxFailures = 5
×
1076

×
1077
        done := make(chan error, 2)
×
1078
        go func() {
×
1079
                done <- c.OVNNbClient.Echo(context.Background())
×
1080
        }()
×
1081
        go func() {
×
1082
                done <- c.OVNSbClient.Echo(context.Background())
×
1083
        }()
×
1084

1085
        resultsReceived := 0
×
1086
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1087

×
1088
        for resultsReceived < 2 {
×
1089
                select {
×
1090
                case err := <-done:
×
1091
                        resultsReceived++
×
1092
                        if err != nil {
×
1093
                                c.dbFailureCount++
×
1094
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1095
                                if c.dbFailureCount >= maxFailures {
×
1096
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1097
                                }
×
1098
                                return
×
1099
                        }
1100
                case <-timeout:
×
1101
                        c.dbFailureCount++
×
1102
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1103
                        if c.dbFailureCount >= maxFailures {
×
1104
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1105
                        }
×
1106
                        return
×
1107
                }
1108
        }
1109

1110
        if c.dbFailureCount > 0 {
×
1111
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1112
                c.dbFailureCount = 0
×
1113
        }
×
1114
}
1115

1116
func (c *Controller) shutdown() {
×
1117
        utilruntime.HandleCrash()
×
1118

×
1119
        c.addOrUpdatePodQueue.ShutDown()
×
1120
        c.deletePodQueue.ShutDown()
×
1121
        c.updatePodSecurityQueue.ShutDown()
×
1122

×
1123
        c.addNamespaceQueue.ShutDown()
×
1124

×
1125
        c.addOrUpdateSubnetQueue.ShutDown()
×
1126
        c.deleteSubnetQueue.ShutDown()
×
1127
        c.updateSubnetStatusQueue.ShutDown()
×
1128
        c.syncVirtualPortsQueue.ShutDown()
×
1129

×
1130
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1131
        c.updateIPPoolStatusQueue.ShutDown()
×
1132
        c.deleteIPPoolQueue.ShutDown()
×
1133

×
1134
        c.addNodeQueue.ShutDown()
×
1135
        c.updateNodeQueue.ShutDown()
×
1136
        c.deleteNodeQueue.ShutDown()
×
1137

×
1138
        c.addServiceQueue.ShutDown()
×
1139
        c.deleteServiceQueue.ShutDown()
×
1140
        c.updateServiceQueue.ShutDown()
×
1141
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1142

×
1143
        c.addVlanQueue.ShutDown()
×
1144
        c.delVlanQueue.ShutDown()
×
1145
        c.updateVlanQueue.ShutDown()
×
1146

×
1147
        c.addOrUpdateVpcQueue.ShutDown()
×
1148
        c.updateVpcStatusQueue.ShutDown()
×
1149
        c.delVpcQueue.ShutDown()
×
1150

×
1151
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1152
        c.initVpcNatGatewayQueue.ShutDown()
×
1153
        c.delVpcNatGatewayQueue.ShutDown()
×
1154
        c.updateVpcEipQueue.ShutDown()
×
1155
        c.updateVpcFloatingIPQueue.ShutDown()
×
1156
        c.updateVpcDnatQueue.ShutDown()
×
1157
        c.updateVpcSnatQueue.ShutDown()
×
1158
        c.updateVpcSubnetQueue.ShutDown()
×
1159

×
1160
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1161
        c.delVpcEgressGatewayQueue.ShutDown()
×
1162

×
1163
        if c.config.EnableLb {
×
1164
                c.addSwitchLBRuleQueue.ShutDown()
×
1165
                c.delSwitchLBRuleQueue.ShutDown()
×
1166
                c.updateSwitchLBRuleQueue.ShutDown()
×
1167

×
1168
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1169
                c.delVpcDNSQueue.ShutDown()
×
1170
        }
×
1171

1172
        c.addIPQueue.ShutDown()
×
1173
        c.updateIPQueue.ShutDown()
×
1174
        c.delIPQueue.ShutDown()
×
1175

×
1176
        c.addVirtualIPQueue.ShutDown()
×
1177
        c.updateVirtualIPQueue.ShutDown()
×
1178
        c.updateVirtualParentsQueue.ShutDown()
×
1179
        c.delVirtualIPQueue.ShutDown()
×
1180

×
1181
        c.addIptablesEipQueue.ShutDown()
×
1182
        c.updateIptablesEipQueue.ShutDown()
×
1183
        c.resetIptablesEipQueue.ShutDown()
×
1184
        c.delIptablesEipQueue.ShutDown()
×
1185

×
1186
        c.addIptablesFipQueue.ShutDown()
×
1187
        c.updateIptablesFipQueue.ShutDown()
×
1188
        c.delIptablesFipQueue.ShutDown()
×
1189

×
1190
        c.addIptablesDnatRuleQueue.ShutDown()
×
1191
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1192
        c.delIptablesDnatRuleQueue.ShutDown()
×
1193

×
1194
        c.addIptablesSnatRuleQueue.ShutDown()
×
1195
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1196
        c.delIptablesSnatRuleQueue.ShutDown()
×
1197

×
1198
        c.addQoSPolicyQueue.ShutDown()
×
1199
        c.updateQoSPolicyQueue.ShutDown()
×
1200
        c.delQoSPolicyQueue.ShutDown()
×
1201

×
1202
        c.addOvnEipQueue.ShutDown()
×
1203
        c.updateOvnEipQueue.ShutDown()
×
1204
        c.resetOvnEipQueue.ShutDown()
×
1205
        c.delOvnEipQueue.ShutDown()
×
1206

×
1207
        c.addOvnFipQueue.ShutDown()
×
1208
        c.updateOvnFipQueue.ShutDown()
×
1209
        c.delOvnFipQueue.ShutDown()
×
1210

×
1211
        c.addOvnSnatRuleQueue.ShutDown()
×
1212
        c.updateOvnSnatRuleQueue.ShutDown()
×
1213
        c.delOvnSnatRuleQueue.ShutDown()
×
1214

×
1215
        c.addOvnDnatRuleQueue.ShutDown()
×
1216
        c.updateOvnDnatRuleQueue.ShutDown()
×
1217
        c.delOvnDnatRuleQueue.ShutDown()
×
1218

×
1219
        if c.config.EnableNP {
×
1220
                c.updateNpQueue.ShutDown()
×
1221
                c.deleteNpQueue.ShutDown()
×
1222
        }
×
1223
        if c.config.EnableANP {
×
1224
                c.addAnpQueue.ShutDown()
×
1225
                c.updateAnpQueue.ShutDown()
×
1226
                c.deleteAnpQueue.ShutDown()
×
1227

×
1228
                c.addBanpQueue.ShutDown()
×
1229
                c.updateBanpQueue.ShutDown()
×
1230
                c.deleteBanpQueue.ShutDown()
×
1231

×
1232
                c.addCnpQueue.ShutDown()
×
1233
                c.updateCnpQueue.ShutDown()
×
1234
                c.deleteCnpQueue.ShutDown()
×
1235
        }
×
1236

1237
        if c.config.EnableDNSNameResolver {
×
1238
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1239
                c.deleteDNSNameResolverQueue.ShutDown()
×
1240
        }
×
1241

1242
        c.addOrUpdateSgQueue.ShutDown()
×
1243
        c.delSgQueue.ShutDown()
×
1244
        c.syncSgPortsQueue.ShutDown()
×
1245

×
1246
        c.addOrUpdateCsrQueue.ShutDown()
×
1247

×
1248
        if c.config.EnableLiveMigrationOptimize {
×
1249
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1250
        }
×
1251
}
1252

1253
func (c *Controller) startWorkers(ctx context.Context) {
×
1254
        klog.Info("Starting workers")
×
1255

×
1256
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1257
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1258
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1259

×
1260
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1261
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1262
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1263
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1264
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1265
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1266
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1267
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1268
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1269
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1270
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1271
        // add default and join subnet and wait them ready
×
1272
        for range c.config.WorkerNum {
×
1273
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1274
        }
×
1275
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1276
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1277
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1278
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1279
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1280
                klog.Infof("wait for subnets %v ready", subnets)
×
1281

×
1282
                return c.allSubnetReady(subnets...)
×
1283
        })
×
1284
        if err != nil {
×
1285
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1286
        }
×
1287

1288
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1289
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1290
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1291

×
1292
        // run node worker before handle any pods
×
1293
        for range c.config.WorkerNum {
×
1294
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1295
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1296
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1297
        }
×
1298
        for {
×
1299
                ready := true
×
1300
                time.Sleep(3 * time.Second)
×
1301
                nodes, err := c.nodesLister.List(labels.Everything())
×
1302
                if err != nil {
×
1303
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1304
                }
×
1305
                for _, node := range nodes {
×
1306
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1307
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1308
                                ready = false
×
1309
                                break
×
1310
                        }
1311
                }
1312
                if ready {
×
1313
                        break
×
1314
                }
1315
        }
1316

1317
        if c.config.EnableLb {
×
1318
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1319
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1320
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1321

×
1322
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1323
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1324
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1325

×
1326
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1327
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1328
                go wait.Until(func() {
×
1329
                        c.resyncVpcDNSConfig()
×
1330
                }, 5*time.Second, ctx.Done())
×
1331
        }
1332

1333
        for range c.config.WorkerNum {
×
1334
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1335
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1336
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1337

×
1338
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1339
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1340
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1341
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1342
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1343

×
1344
                if c.config.EnableLb {
×
1345
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1346
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1347
                }
×
1348

1349
                if c.config.EnableNP {
×
1350
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1351
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1352
                }
×
1353

1354
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1355
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1356
        }
1357

1358
        if c.config.EnableEipSnat {
×
1359
                go wait.Until(func() {
×
1360
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1361
                        c.resyncExternalGateway()
×
1362
                }, time.Second, ctx.Done())
×
1363

1364
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1365
                c.OVNNbClient.MonitorBFD()
×
1366
        }
1367
        // TODO: we should merge these two vpc nat config into one config and resync them together
1368
        go wait.Until(func() {
×
1369
                c.resyncVpcNatGwConfig()
×
1370
        }, time.Second, ctx.Done())
×
1371

1372
        go wait.Until(func() {
×
1373
                c.resyncVpcNatConfig()
×
1374
        }, time.Second, ctx.Done())
×
1375

1376
        if c.config.GCInterval != 0 {
×
1377
                go wait.Until(func() {
×
1378
                        if err := c.markAndCleanLSP(); err != nil {
×
1379
                                klog.Errorf("gc lsp error: %v", err)
×
1380
                        }
×
1381
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1382
        }
1383

1384
        go wait.Until(func() {
×
1385
                if err := c.inspectPod(); err != nil {
×
1386
                        klog.Errorf("inspection error: %v", err)
×
1387
                }
×
1388
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1389

1390
        if c.config.EnableExternalVpc {
×
1391
                go wait.Until(func() {
×
1392
                        c.syncExternalVpc()
×
1393
                }, 5*time.Second, ctx.Done())
×
1394
        }
1395

1396
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1397
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1398
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1399
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1400

×
1401
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1402
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1403
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1404
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1405

×
1406
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1407
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1408
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1409

×
1410
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1411
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1412
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1413

×
1414
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1415
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1416
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1417

×
1418
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1419

×
1420
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1421
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1422
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1423

×
1424
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1425
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1426
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1427
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1428

×
1429
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1430
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1431
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1432
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1433

×
1434
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1435
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1436
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1437

×
1438
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1439
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1441

×
1442
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1443
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1444
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1445

×
1446
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1447
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1448
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1449

×
1450
        if c.config.EnableANP {
×
1451
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1452
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1453
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1454

×
1455
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1456
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1457
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1458

×
1459
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1460
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1461
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1462
        }
×
1463

1464
        if c.config.EnableDNSNameResolver {
×
1465
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1466
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1467
        }
×
1468

1469
        if c.config.EnableLiveMigrationOptimize {
×
1470
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1471
        }
×
1472

1473
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1474

×
1475
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1476
}
1477

1478
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1479
        for _, lsName := range subnets {
2✔
1480
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1481
                if err != nil {
1✔
1482
                        klog.Error(err)
×
1483
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1484
                }
×
1485

1486
                if !exist {
2✔
1487
                        return false, nil
1✔
1488
                }
1✔
1489
        }
1490

1491
        return true, nil
1✔
1492
}
1493

1494
func (c *Controller) initResourceOnce() {
×
1495
        c.registerSubnetMetrics()
×
1496

×
1497
        if err := c.initNodeChassis(); err != nil {
×
1498
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1499
        }
×
1500

1501
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1502
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1503
        }
×
1504
        if err := c.syncSecurityGroup(); err != nil {
×
1505
                util.LogFatalAndExit(err, "failed to sync security group")
×
1506
        }
×
1507

1508
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1509
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1510
        }
×
1511

1512
        if err := c.initVpcNatGw(); err != nil {
×
1513
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1514
        }
×
1515
        if c.config.EnableLb {
×
1516
                if err := c.initVpcDNSConfig(); err != nil {
×
1517
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1518
                }
×
1519
        }
1520

1521
        // remove resources in ovndb that not exist any more in kubernetes resources
1522
        // process gc at last in case of affecting other init process
1523
        if err := c.gc(); err != nil {
×
1524
                util.LogFatalAndExit(err, "failed to run gc")
×
1525
        }
×
1526
}
1527

1528
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1529
        item, shutdown := queue.Get()
×
1530
        if shutdown {
×
1531
                return false
×
1532
        }
×
1533

1534
        err := func(item T) error {
×
1535
                defer queue.Done(item)
×
1536
                if err := handler(item); err != nil {
×
1537
                        queue.AddRateLimited(item)
×
1538
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1539
                }
×
1540
                queue.Forget(item)
×
1541
                return nil
×
1542
        }(item)
1543
        if err != nil {
×
1544
                utilruntime.HandleError(err)
×
1545
                return true
×
1546
        }
×
1547
        return true
×
1548
}
1549

1550
func getWorkItemKey(obj any) string {
×
1551
        switch v := obj.(type) {
×
1552
        case string:
×
1553
                return v
×
1554
        case *vpcService:
×
1555
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1556
        case *AdminNetworkPolicyChangedDelta:
×
1557
                return v.key
×
1558
        case *SlrInfo:
×
1559
                return v.Name
×
1560
        default:
×
1561
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1562
                if err != nil {
×
1563
                        utilruntime.HandleError(err)
×
1564
                        return ""
×
1565
                }
×
1566
                return key
×
1567
        }
1568
}
1569

1570
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1571
        return func() {
×
1572
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1573
                }
×
1574
        }
1575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc