• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 20405539887

21 Dec 2025 05:53AM UTC coverage: 22.596% (-0.005%) from 22.601%
20405539887

push

github

web-flow
refactor: unify type constants (#6081)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

7 of 67 new or added lines in 13 files covered. (10.45%)

7 existing lines in 5 files now uncovered.

12056 of 53354 relevant lines covered (22.6%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.15
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync/atomic"
9
        "time"
10

11
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
12
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
13
        "github.com/puzpuzpuz/xsync/v4"
14
        "golang.org/x/time/rate"
15
        corev1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        "k8s.io/client-go/discovery"
22
        kubeinformers "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes/scheme"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        appsv1 "k8s.io/client-go/listers/apps/v1"
26
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
27
        v1 "k8s.io/client-go/listers/core/v1"
28
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
29
        netv1 "k8s.io/client-go/listers/networking/v1"
30
        "k8s.io/client-go/tools/cache"
31
        "k8s.io/client-go/tools/record"
32
        "k8s.io/client-go/util/workqueue"
33
        "k8s.io/klog/v2"
34
        "k8s.io/utils/keymutex"
35
        "k8s.io/utils/set"
36
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
37
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
38
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
39
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
40
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
41

42
        "github.com/kubeovn/kube-ovn/pkg/informer"
43

44
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
45
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
46
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
47
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
48
        "github.com/kubeovn/kube-ovn/pkg/ovs"
49
        "github.com/kubeovn/kube-ovn/pkg/util"
50
)
51

52
const controllerAgentName = "kube-ovn-controller"
53

54
const (
55
        logicalSwitchKey              = "ls"
56
        logicalRouterKey              = "lr"
57
        portGroupKey                  = "pg"
58
        networkPolicyKey              = "np"
59
        sgKey                         = "sg"
60
        sgsKey                        = "security_groups"
61
        u2oKey                        = "u2o"
62
        adminNetworkPolicyKey         = "anp"
63
        baselineAdminNetworkPolicyKey = "banp"
64
        ippoolKey                     = "ippool"
65
        clusterNetworkPolicyKey       = "cnp"
66
)
67

68
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
69
type Controller struct {
70
        config *Configuration
71

72
        ipam           *ovnipam.IPAM
73
        namedPort      *NamedPort
74
        anpPrioNameMap map[int32]string
75
        anpNamePrioMap map[string]int32
76
        bnpPrioNameMap map[int32]string
77
        bnpNamePrioMap map[string]int32
78

79
        OVNNbClient ovs.NbClient
80
        OVNSbClient ovs.SbClient
81

82
        // ExternalGatewayType define external gateway type, centralized
83
        ExternalGatewayType string
84

85
        podsLister             v1.PodLister
86
        podsSynced             cache.InformerSynced
87
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
88
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
89
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
90
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
91
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
92
        podKeyMutex            keymutex.KeyMutex
93

94
        vpcsLister           kubeovnlister.VpcLister
95
        vpcSynced            cache.InformerSynced
96
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
97
        vpcLastPoliciesMap   *xsync.Map[string, string]
98
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
99
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
100
        vpcKeyMutex          keymutex.KeyMutex
101

102
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
103
        vpcNatGatewaySynced           cache.InformerSynced
104
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
105
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
106
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
107
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
108
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
109
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
111
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
112
        vpcNatGwKeyMutex              keymutex.KeyMutex
113
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
114

115
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
116
        vpcEgressGatewaySynced           cache.InformerSynced
117
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
118
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
119
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
120

121
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
122
        switchLBRuleSynced      cache.InformerSynced
123
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
124
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
125
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
126

127
        vpcDNSLister           kubeovnlister.VpcDnsLister
128
        vpcDNSSynced           cache.InformerSynced
129
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
130
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
131

132
        subnetsLister           kubeovnlister.SubnetLister
133
        subnetSynced            cache.InformerSynced
134
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
135
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
136
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
137
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
138
        subnetKeyMutex          keymutex.KeyMutex
139

140
        ippoolLister            kubeovnlister.IPPoolLister
141
        ippoolSynced            cache.InformerSynced
142
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
143
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
144
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
145
        ippoolKeyMutex          keymutex.KeyMutex
146

147
        ipsLister     kubeovnlister.IPLister
148
        ipSynced      cache.InformerSynced
149
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
150
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
151
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
152

153
        virtualIpsLister          kubeovnlister.VipLister
154
        virtualIpsSynced          cache.InformerSynced
155
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
156
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
157
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
158
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
159

160
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
161
        iptablesEipSynced      cache.InformerSynced
162
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
163
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
164
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
165
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
166

167
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
168
        iptablesFipSynced      cache.InformerSynced
169
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
170
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
171
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
172

173
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
174
        iptablesDnatRuleSynced      cache.InformerSynced
175
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
176
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
177
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
178

179
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
180
        iptablesSnatRuleSynced      cache.InformerSynced
181
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
182
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
183
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
184

185
        ovnEipsLister     kubeovnlister.OvnEipLister
186
        ovnEipSynced      cache.InformerSynced
187
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
188
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
189
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
190
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
191

192
        ovnFipsLister     kubeovnlister.OvnFipLister
193
        ovnFipSynced      cache.InformerSynced
194
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
195
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
196
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
197

198
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
199
        ovnSnatRuleSynced      cache.InformerSynced
200
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
201
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
202
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
203

204
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
205
        ovnDnatRuleSynced      cache.InformerSynced
206
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
207
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
208
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
209

210
        providerNetworksLister kubeovnlister.ProviderNetworkLister
211
        providerNetworkSynced  cache.InformerSynced
212

213
        vlansLister     kubeovnlister.VlanLister
214
        vlanSynced      cache.InformerSynced
215
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
216
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
217
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
218
        vlanKeyMutex    keymutex.KeyMutex
219

220
        namespacesLister  v1.NamespaceLister
221
        namespacesSynced  cache.InformerSynced
222
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
223
        nsKeyMutex        keymutex.KeyMutex
224

225
        nodesLister     v1.NodeLister
226
        nodesSynced     cache.InformerSynced
227
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
228
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
229
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
230
        nodeKeyMutex    keymutex.KeyMutex
231

232
        servicesLister     v1.ServiceLister
233
        serviceSynced      cache.InformerSynced
234
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
235
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
236
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
237
        svcKeyMutex        keymutex.KeyMutex
238

239
        endpointSlicesLister          discoveryv1.EndpointSliceLister
240
        endpointSlicesSynced          cache.InformerSynced
241
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
242
        epKeyMutex                    keymutex.KeyMutex
243

244
        deploymentsLister appsv1.DeploymentLister
245
        deploymentsSynced cache.InformerSynced
246

247
        npsLister     netv1.NetworkPolicyLister
248
        npsSynced     cache.InformerSynced
249
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
250
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
251
        npKeyMutex    keymutex.KeyMutex
252

253
        sgsLister          kubeovnlister.SecurityGroupLister
254
        sgSynced           cache.InformerSynced
255
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
256
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
257
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
258
        sgKeyMutex         keymutex.KeyMutex
259

260
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
261
        qosPolicySynced      cache.InformerSynced
262
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
263
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
264
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
265

266
        configMapsLister v1.ConfigMapLister
267
        configMapsSynced cache.InformerSynced
268

269
        anpsLister     anplister.AdminNetworkPolicyLister
270
        anpsSynced     cache.InformerSynced
271
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
272
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
273
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
274
        anpKeyMutex    keymutex.KeyMutex
275

276
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
277
        dnsNameResolversSynced          cache.InformerSynced
278
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
279
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
280

281
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
282
        banpsSynced     cache.InformerSynced
283
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
284
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
285
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
286
        banpKeyMutex    keymutex.KeyMutex
287

288
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
289
        cnpsSynced     cache.InformerSynced
290
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
291
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
292
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
293
        cnpKeyMutex    keymutex.KeyMutex
294

295
        csrLister           certListerv1.CertificateSigningRequestLister
296
        csrSynced           cache.InformerSynced
297
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
298

299
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
300
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
301
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
302

303
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
304
        netAttachSynced          cache.InformerSynced
305
        netAttachInformerFactory netAttach.SharedInformerFactory
306

307
        recorder               record.EventRecorder
308
        informerFactory        kubeinformers.SharedInformerFactory
309
        cmInformerFactory      kubeinformers.SharedInformerFactory
310
        deployInformerFactory  kubeinformers.SharedInformerFactory
311
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
312
        anpInformerFactory     anpinformer.SharedInformerFactory
313

314
        // Database health check
315
        dbFailureCount int
316

317
        distributedSubnetNeedSync atomic.Bool
318
}
319

320
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
321
        if rateLimiter == nil {
2✔
322
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
323
        }
1✔
324
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
325
}
326

327
// Run creates and runs a new ovn controller
328
func Run(ctx context.Context, config *Configuration) {
×
329
        klog.V(4).Info("Creating event broadcaster")
×
330
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
331
        eventBroadcaster.StartLogging(klog.Infof)
×
332
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
333
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
334
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
335
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
336
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
337
        )
×
338

×
339
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
340
        if err != nil {
×
341
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
342
        }
×
343

344
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
345
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
346
                        listOption.AllowWatchBookmarks = true
×
347
                }))
×
348
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
349
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
350
                        listOption.AllowWatchBookmarks = true
×
351
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
352
        // deployment informer used to list/watch vpc egress gateway workloads
353
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
354
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
355
                        listOption.AllowWatchBookmarks = true
×
356
                        listOption.LabelSelector = selector.String()
×
357
                }))
×
358
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
359
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
360
                        listOption.AllowWatchBookmarks = true
×
361
                }))
×
362
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
363
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
364
                        listOption.AllowWatchBookmarks = true
×
365
                }))
×
366

367
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
368
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
369
                        listOption.AllowWatchBookmarks = true
×
370
                }),
×
371
        )
372

373
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
374

×
375
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
376
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
377
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
378
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
379
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
380
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
381
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
382
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
383
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
384
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
385
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
386
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
387
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
388
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
389
        podInformer := informerFactory.Core().V1().Pods()
×
390
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
391
        nodeInformer := informerFactory.Core().V1().Nodes()
×
392
        serviceInformer := informerFactory.Core().V1().Services()
×
393
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
394
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
395
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
396
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
397
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
398
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
399
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
400
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
401
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
402
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
403
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
404
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
405
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
406
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
407
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
408
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
409
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
410

×
411
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
412
        controller := &Controller{
×
413
                config:             config,
×
414
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
415
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
416
                ipam:               ovnipam.NewIPAM(),
×
417
                namedPort:          NewNamedPort(),
×
418

×
419
                vpcsLister:           vpcInformer.Lister(),
×
420
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
421
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
422
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
423
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
424
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
425
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
426

×
427
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
428
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
429
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
430
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
431
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
432
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
433
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
434
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
435
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
436
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
437
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
438
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
439
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
440
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
441
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
442
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
443
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
444

×
445
                subnetsLister:           subnetInformer.Lister(),
×
446
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
447
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
448
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
449
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
450
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
451
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
452

×
453
                ippoolLister:            ippoolInformer.Lister(),
×
454
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
455
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
456
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
457
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
458
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
459

×
460
                ipsLister:     ipInformer.Lister(),
×
461
                ipSynced:      ipInformer.Informer().HasSynced,
×
462
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
463
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
464
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
465

×
466
                virtualIpsLister:          virtualIPInformer.Lister(),
×
467
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
468
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
469
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
470
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
471
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
472

×
473
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
474
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
475
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
476
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
477
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
478
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
479

×
480
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
481
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
482
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
483
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
484
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
485

×
486
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
487
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
488
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
489
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
490
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
491

×
492
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
493
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
494
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
495
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
496
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
497

×
498
                vlansLister:     vlanInformer.Lister(),
×
499
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
500
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
501
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
502
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
503
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
504

×
505
                providerNetworksLister: providerNetworkInformer.Lister(),
×
506
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
507

×
508
                podsLister:          podInformer.Lister(),
×
509
                podsSynced:          podInformer.Informer().HasSynced,
×
510
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
511
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
512
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
513
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
514
                                Name:          "DeletePod",
×
515
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
516
                        },
×
517
                ),
×
518
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
519
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
520

×
521
                namespacesLister:  namespaceInformer.Lister(),
×
522
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
523
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
524
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
525

×
526
                nodesLister:     nodeInformer.Lister(),
×
527
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
528
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
529
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
530
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
531
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
532

×
533
                servicesLister:     serviceInformer.Lister(),
×
534
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
535
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
536
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
537
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
538
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
539

×
540
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
541
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
542
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
543
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
544

×
545
                deploymentsLister: deploymentInformer.Lister(),
×
546
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
547

×
548
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
549
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
550
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
551
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
552
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
553

×
554
                configMapsLister: configMapInformer.Lister(),
×
555
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
556

×
557
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
558
                sgsLister:          sgInformer.Lister(),
×
559
                sgSynced:           sgInformer.Informer().HasSynced,
×
560
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
561
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
562
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
563

×
564
                ovnEipsLister:     ovnEipInformer.Lister(),
×
565
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
566
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
567
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
568
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
569
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
570

×
571
                ovnFipsLister:     ovnFipInformer.Lister(),
×
572
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
573
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
574
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
575
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
576

×
577
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
578
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
579
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
580
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
581
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
582

×
583
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
584
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
585
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
586
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
587
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
588

×
589
                csrLister:           csrInformer.Lister(),
×
590
                csrSynced:           csrInformer.Informer().HasSynced,
×
591
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
592

×
593
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
594
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
595
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
596

×
597
                netAttachLister:          netAttachInformer.Lister(),
×
598
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
599
                netAttachInformerFactory: attachNetInformerFactory,
×
600

×
601
                recorder:               recorder,
×
602
                informerFactory:        informerFactory,
×
603
                cmInformerFactory:      cmInformerFactory,
×
604
                deployInformerFactory:  deployInformerFactory,
×
605
                kubeovnInformerFactory: kubeovnInformerFactory,
×
606
                anpInformerFactory:     anpInformerFactory,
×
607
        }
×
608

×
609
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
610
                config.OvnNbAddr,
×
611
                config.OvnTimeout,
×
612
                config.OvsDbConnectTimeout,
×
613
                config.OvsDbInactivityTimeout,
×
614
                config.OvsDbConnectMaxRetry,
×
615
        ); err != nil {
×
616
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
617
        }
×
618
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
619
                config.OvnSbAddr,
×
620
                config.OvnTimeout,
×
621
                config.OvsDbConnectTimeout,
×
622
                config.OvsDbInactivityTimeout,
×
623
                config.OvsDbConnectMaxRetry,
×
624
        ); err != nil {
×
625
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
626
        }
×
627
        if config.EnableLb {
×
628
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
629
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
630
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
631
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
632
                        "DeleteSwitchLBRule",
×
633
                        workqueue.NewTypedMaxOfRateLimiter(
×
634
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
635
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
636
                        ),
×
637
                )
×
638
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
639
                        "UpdateSwitchLBRule",
×
640
                        workqueue.NewTypedMaxOfRateLimiter(
×
641
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
642
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
643
                        ),
×
644
                )
×
645

×
646
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
647
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
648
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
649
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
650
        }
×
651

652
        if config.EnableNP {
×
653
                controller.npsLister = npInformer.Lister()
×
654
                controller.npsSynced = npInformer.Informer().HasSynced
×
655
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
656
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
657
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
658
        }
×
659

660
        if config.EnableANP {
×
661
                controller.anpsLister = anpInformer.Lister()
×
662
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
663
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
664
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
665
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
666
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
667

×
668
                controller.banpsLister = banpInformer.Lister()
×
669
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
670
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
671
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
672
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
673
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
674

×
675
                controller.cnpsLister = cnpInformer.Lister()
×
676
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
677
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
678
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
679
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
680
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
681
        }
×
682

683
        if config.EnableDNSNameResolver {
×
684
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
685
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
686
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
687
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
688
        }
×
689

690
        defer controller.shutdown()
×
691
        klog.Info("Starting OVN controller")
×
692

×
693
        // Wait for the caches to be synced before starting workers
×
694
        controller.informerFactory.Start(ctx.Done())
×
695
        controller.cmInformerFactory.Start(ctx.Done())
×
696
        controller.deployInformerFactory.Start(ctx.Done())
×
697
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
698
        controller.anpInformerFactory.Start(ctx.Done())
×
699
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
700
        controller.StartNetAttachInformerFactory(ctx)
×
701

×
702
        klog.Info("Waiting for informer caches to sync")
×
703
        cacheSyncs := []cache.InformerSynced{
×
704
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
705
                controller.vpcSynced, controller.subnetSynced,
×
706
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
707
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
708
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
709
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
710
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
711
                controller.ovnDnatRuleSynced,
×
712
        }
×
713
        if controller.config.EnableLb {
×
714
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
715
        }
×
716
        if controller.config.EnableNP {
×
717
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
718
        }
×
719
        if controller.config.EnableANP {
×
720
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
721
        }
×
722
        if controller.config.EnableDNSNameResolver {
×
723
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
724
        }
×
725

726
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
727
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
728
        }
×
729

730
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
731
                AddFunc:    controller.enqueueAddPod,
×
732
                DeleteFunc: controller.enqueueDeletePod,
×
733
                UpdateFunc: controller.enqueueUpdatePod,
×
734
        }); err != nil {
×
735
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
736
        }
×
737

738
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
739
                AddFunc:    controller.enqueueAddNamespace,
×
740
                UpdateFunc: controller.enqueueUpdateNamespace,
×
741
                DeleteFunc: controller.enqueueDeleteNamespace,
×
742
        }); err != nil {
×
743
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
744
        }
×
745

746
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
747
                AddFunc:    controller.enqueueAddNode,
×
748
                UpdateFunc: controller.enqueueUpdateNode,
×
749
                DeleteFunc: controller.enqueueDeleteNode,
×
750
        }); err != nil {
×
751
                util.LogFatalAndExit(err, "failed to add node event handler")
×
752
        }
×
753

754
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
755
                AddFunc:    controller.enqueueAddService,
×
756
                DeleteFunc: controller.enqueueDeleteService,
×
757
                UpdateFunc: controller.enqueueUpdateService,
×
758
        }); err != nil {
×
759
                util.LogFatalAndExit(err, "failed to add service event handler")
×
760
        }
×
761

762
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
763
                AddFunc:    controller.enqueueAddEndpointSlice,
×
764
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
765
        }); err != nil {
×
766
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
767
        }
×
768

769
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
770
                AddFunc:    controller.enqueueAddDeployment,
×
771
                UpdateFunc: controller.enqueueUpdateDeployment,
×
772
        }); err != nil {
×
773
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
774
        }
×
775

776
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
777
                AddFunc:    controller.enqueueAddVpc,
×
778
                UpdateFunc: controller.enqueueUpdateVpc,
×
779
                DeleteFunc: controller.enqueueDelVpc,
×
780
        }); err != nil {
×
781
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
782
        }
×
783

784
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
785
                AddFunc:    controller.enqueueAddVpcNatGw,
×
786
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
787
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
788
        }); err != nil {
×
789
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
790
        }
×
791

792
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
793
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
794
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
795
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
796
        }); err != nil {
×
797
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
798
        }
×
799

800
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
801
                AddFunc:    controller.enqueueAddSubnet,
×
802
                UpdateFunc: controller.enqueueUpdateSubnet,
×
803
                DeleteFunc: controller.enqueueDeleteSubnet,
×
804
        }); err != nil {
×
805
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
806
        }
×
807

808
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
809
                AddFunc:    controller.enqueueAddIPPool,
×
810
                UpdateFunc: controller.enqueueUpdateIPPool,
×
811
                DeleteFunc: controller.enqueueDeleteIPPool,
×
812
        }); err != nil {
×
813
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
814
        }
×
815

816
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
817
                AddFunc:    controller.enqueueAddIP,
×
818
                UpdateFunc: controller.enqueueUpdateIP,
×
819
                DeleteFunc: controller.enqueueDelIP,
×
820
        }); err != nil {
×
821
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
822
        }
×
823

824
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
825
                AddFunc:    controller.enqueueAddVlan,
×
826
                DeleteFunc: controller.enqueueDelVlan,
×
827
                UpdateFunc: controller.enqueueUpdateVlan,
×
828
        }); err != nil {
×
829
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
830
        }
×
831

832
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
833
                AddFunc:    controller.enqueueAddSg,
×
834
                DeleteFunc: controller.enqueueDeleteSg,
×
835
                UpdateFunc: controller.enqueueUpdateSg,
×
836
        }); err != nil {
×
837
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
838
        }
×
839

840
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
841
                AddFunc:    controller.enqueueAddVirtualIP,
×
842
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
843
                DeleteFunc: controller.enqueueDelVirtualIP,
×
844
        }); err != nil {
×
845
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
846
        }
×
847

848
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
849
                AddFunc:    controller.enqueueAddIptablesEip,
×
850
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
851
                DeleteFunc: controller.enqueueDelIptablesEip,
×
852
        }); err != nil {
×
853
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
854
        }
×
855

856
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
857
                AddFunc:    controller.enqueueAddIptablesFip,
×
858
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
859
                DeleteFunc: controller.enqueueDelIptablesFip,
×
860
        }); err != nil {
×
861
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
862
        }
×
863

864
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
865
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
866
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
867
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
868
        }); err != nil {
×
869
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
870
        }
×
871

872
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
873
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
874
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
875
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
876
        }); err != nil {
×
877
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
878
        }
×
879

880
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                AddFunc:    controller.enqueueAddOvnEip,
×
882
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
883
                DeleteFunc: controller.enqueueDelOvnEip,
×
884
        }); err != nil {
×
885
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
886
        }
×
887

888
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
889
                AddFunc:    controller.enqueueAddOvnFip,
×
890
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
891
                DeleteFunc: controller.enqueueDelOvnFip,
×
892
        }); err != nil {
×
893
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
894
        }
×
895

896
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
897
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
898
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
899
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
900
        }); err != nil {
×
901
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
902
        }
×
903

904
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
906
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
907
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
908
        }); err != nil {
×
909
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
910
        }
×
911

912
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
913
                AddFunc:    controller.enqueueAddQoSPolicy,
×
914
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
915
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
916
        }); err != nil {
×
917
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
918
        }
×
919

920
        if config.EnableLb {
×
921
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
922
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
923
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
924
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
925
                }); err != nil {
×
926
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
927
                }
×
928

929
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
930
                        AddFunc:    controller.enqueueAddVpcDNS,
×
931
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
932
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
933
                }); err != nil {
×
934
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
935
                }
×
936
        }
937

938
        if config.EnableNP {
×
939
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
940
                        AddFunc:    controller.enqueueAddNp,
×
941
                        UpdateFunc: controller.enqueueUpdateNp,
×
942
                        DeleteFunc: controller.enqueueDeleteNp,
×
943
                }); err != nil {
×
944
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
945
                }
×
946
        }
947

948
        if config.EnableANP {
×
949
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
950
                        AddFunc:    controller.enqueueAddAnp,
×
951
                        UpdateFunc: controller.enqueueUpdateAnp,
×
952
                        DeleteFunc: controller.enqueueDeleteAnp,
×
953
                }); err != nil {
×
954
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
955
                }
×
956

957
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
958
                        AddFunc:    controller.enqueueAddBanp,
×
959
                        UpdateFunc: controller.enqueueUpdateBanp,
×
960
                        DeleteFunc: controller.enqueueDeleteBanp,
×
961
                }); err != nil {
×
962
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
963
                }
×
964

965
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
966
                        AddFunc:    controller.enqueueAddCnp,
×
967
                        UpdateFunc: controller.enqueueUpdateCnp,
×
968
                        DeleteFunc: controller.enqueueDeleteCnp,
×
969
                }); err != nil {
×
970
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
971
                }
×
972

973
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
974
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
975
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
976
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
977
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
978
        }
979

980
        if config.EnableDNSNameResolver {
×
981
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
982
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
983
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
984
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
985
                }); err != nil {
×
986
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
987
                }
×
988
        }
989

990
        if config.EnableOVNIPSec {
×
991
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
992
                        AddFunc:    controller.enqueueAddCsr,
×
993
                        UpdateFunc: controller.enqueueUpdateCsr,
×
994
                        // no need to add delete func for csr
×
995
                }); err != nil {
×
996
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
997
                }
×
998
        }
999

1000
        controller.Run(ctx)
×
1001
}
1002

1003
// Run will set up the event handlers for types we are interested in, as well
1004
// as syncing informer caches and starting workers. It will block until stopCh
1005
// is closed, at which point it will shutdown the workqueue and wait for
1006
// workers to finish processing their current work items.
1007
func (c *Controller) Run(ctx context.Context) {
×
1008
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1009
        // Otherwise, the init process should be placed after all workers have already started working
×
1010
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1011
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1012
        }
×
1013

1014
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1015
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1016
        }
×
1017

1018
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1019
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1020
        }
×
1021

1022
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1023
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1024
        }
×
1025

1026
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1027
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1028
        }
×
1029

1030
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1031
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1032
        }
×
1033

1034
        if err := c.InitOVN(); err != nil {
×
1035
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1036
        }
×
1037

1038
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1039
        if err := c.syncIPCR(); err != nil {
×
1040
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1041
        }
×
1042

1043
        if err := c.syncFinalizers(); err != nil {
×
1044
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1045
        }
×
1046

1047
        if err := c.InitIPAM(); err != nil {
×
1048
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1049
        }
×
1050

1051
        if err := c.syncNodeRoutes(); err != nil {
×
1052
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1053
        }
×
1054

1055
        if err := c.syncSubnetCR(); err != nil {
×
1056
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1057
        }
×
1058

1059
        if err := c.syncVlanCR(); err != nil {
×
1060
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1061
        }
×
1062

1063
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1064
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1065
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1066
                }
×
1067
        }
1068

1069
        // start workers to do all the network operations
1070
        c.startWorkers(ctx)
×
1071

×
1072
        c.initResourceOnce()
×
1073
        <-ctx.Done()
×
1074
        klog.Info("Shutting down workers")
×
1075
}
1076

1077
func (c *Controller) dbStatus() {
×
1078
        const maxFailures = 5
×
1079

×
1080
        done := make(chan error, 2)
×
1081
        go func() {
×
1082
                done <- c.OVNNbClient.Echo(context.Background())
×
1083
        }()
×
1084
        go func() {
×
1085
                done <- c.OVNSbClient.Echo(context.Background())
×
1086
        }()
×
1087

1088
        resultsReceived := 0
×
1089
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1090

×
1091
        for resultsReceived < 2 {
×
1092
                select {
×
1093
                case err := <-done:
×
1094
                        resultsReceived++
×
1095
                        if err != nil {
×
1096
                                c.dbFailureCount++
×
1097
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1098
                                if c.dbFailureCount >= maxFailures {
×
1099
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1100
                                }
×
1101
                                return
×
1102
                        }
1103
                case <-timeout:
×
1104
                        c.dbFailureCount++
×
1105
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1106
                        if c.dbFailureCount >= maxFailures {
×
1107
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1108
                        }
×
1109
                        return
×
1110
                }
1111
        }
1112

1113
        if c.dbFailureCount > 0 {
×
1114
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1115
                c.dbFailureCount = 0
×
1116
        }
×
1117
}
1118

1119
func (c *Controller) shutdown() {
×
1120
        utilruntime.HandleCrash()
×
1121

×
1122
        c.addOrUpdatePodQueue.ShutDown()
×
1123
        c.deletePodQueue.ShutDown()
×
1124
        c.updatePodSecurityQueue.ShutDown()
×
1125

×
1126
        c.addNamespaceQueue.ShutDown()
×
1127

×
1128
        c.addOrUpdateSubnetQueue.ShutDown()
×
1129
        c.deleteSubnetQueue.ShutDown()
×
1130
        c.updateSubnetStatusQueue.ShutDown()
×
1131
        c.syncVirtualPortsQueue.ShutDown()
×
1132

×
1133
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1134
        c.updateIPPoolStatusQueue.ShutDown()
×
1135
        c.deleteIPPoolQueue.ShutDown()
×
1136

×
1137
        c.addNodeQueue.ShutDown()
×
1138
        c.updateNodeQueue.ShutDown()
×
1139
        c.deleteNodeQueue.ShutDown()
×
1140

×
1141
        c.addServiceQueue.ShutDown()
×
1142
        c.deleteServiceQueue.ShutDown()
×
1143
        c.updateServiceQueue.ShutDown()
×
1144
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1145

×
1146
        c.addVlanQueue.ShutDown()
×
1147
        c.delVlanQueue.ShutDown()
×
1148
        c.updateVlanQueue.ShutDown()
×
1149

×
1150
        c.addOrUpdateVpcQueue.ShutDown()
×
1151
        c.updateVpcStatusQueue.ShutDown()
×
1152
        c.delVpcQueue.ShutDown()
×
1153

×
1154
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1155
        c.initVpcNatGatewayQueue.ShutDown()
×
1156
        c.delVpcNatGatewayQueue.ShutDown()
×
1157
        c.updateVpcEipQueue.ShutDown()
×
1158
        c.updateVpcFloatingIPQueue.ShutDown()
×
1159
        c.updateVpcDnatQueue.ShutDown()
×
1160
        c.updateVpcSnatQueue.ShutDown()
×
1161
        c.updateVpcSubnetQueue.ShutDown()
×
1162

×
1163
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1164
        c.delVpcEgressGatewayQueue.ShutDown()
×
1165

×
1166
        if c.config.EnableLb {
×
1167
                c.addSwitchLBRuleQueue.ShutDown()
×
1168
                c.delSwitchLBRuleQueue.ShutDown()
×
1169
                c.updateSwitchLBRuleQueue.ShutDown()
×
1170

×
1171
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1172
                c.delVpcDNSQueue.ShutDown()
×
1173
        }
×
1174

1175
        c.addIPQueue.ShutDown()
×
1176
        c.updateIPQueue.ShutDown()
×
1177
        c.delIPQueue.ShutDown()
×
1178

×
1179
        c.addVirtualIPQueue.ShutDown()
×
1180
        c.updateVirtualIPQueue.ShutDown()
×
1181
        c.updateVirtualParentsQueue.ShutDown()
×
1182
        c.delVirtualIPQueue.ShutDown()
×
1183

×
1184
        c.addIptablesEipQueue.ShutDown()
×
1185
        c.updateIptablesEipQueue.ShutDown()
×
1186
        c.resetIptablesEipQueue.ShutDown()
×
1187
        c.delIptablesEipQueue.ShutDown()
×
1188

×
1189
        c.addIptablesFipQueue.ShutDown()
×
1190
        c.updateIptablesFipQueue.ShutDown()
×
1191
        c.delIptablesFipQueue.ShutDown()
×
1192

×
1193
        c.addIptablesDnatRuleQueue.ShutDown()
×
1194
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1195
        c.delIptablesDnatRuleQueue.ShutDown()
×
1196

×
1197
        c.addIptablesSnatRuleQueue.ShutDown()
×
1198
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1199
        c.delIptablesSnatRuleQueue.ShutDown()
×
1200

×
1201
        c.addQoSPolicyQueue.ShutDown()
×
1202
        c.updateQoSPolicyQueue.ShutDown()
×
1203
        c.delQoSPolicyQueue.ShutDown()
×
1204

×
1205
        c.addOvnEipQueue.ShutDown()
×
1206
        c.updateOvnEipQueue.ShutDown()
×
1207
        c.resetOvnEipQueue.ShutDown()
×
1208
        c.delOvnEipQueue.ShutDown()
×
1209

×
1210
        c.addOvnFipQueue.ShutDown()
×
1211
        c.updateOvnFipQueue.ShutDown()
×
1212
        c.delOvnFipQueue.ShutDown()
×
1213

×
1214
        c.addOvnSnatRuleQueue.ShutDown()
×
1215
        c.updateOvnSnatRuleQueue.ShutDown()
×
1216
        c.delOvnSnatRuleQueue.ShutDown()
×
1217

×
1218
        c.addOvnDnatRuleQueue.ShutDown()
×
1219
        c.updateOvnDnatRuleQueue.ShutDown()
×
1220
        c.delOvnDnatRuleQueue.ShutDown()
×
1221

×
1222
        if c.config.EnableNP {
×
1223
                c.updateNpQueue.ShutDown()
×
1224
                c.deleteNpQueue.ShutDown()
×
1225
        }
×
1226
        if c.config.EnableANP {
×
1227
                c.addAnpQueue.ShutDown()
×
1228
                c.updateAnpQueue.ShutDown()
×
1229
                c.deleteAnpQueue.ShutDown()
×
1230

×
1231
                c.addBanpQueue.ShutDown()
×
1232
                c.updateBanpQueue.ShutDown()
×
1233
                c.deleteBanpQueue.ShutDown()
×
1234

×
1235
                c.addCnpQueue.ShutDown()
×
1236
                c.updateCnpQueue.ShutDown()
×
1237
                c.deleteCnpQueue.ShutDown()
×
1238
        }
×
1239

1240
        if c.config.EnableDNSNameResolver {
×
1241
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1242
                c.deleteDNSNameResolverQueue.ShutDown()
×
1243
        }
×
1244

1245
        c.addOrUpdateSgQueue.ShutDown()
×
1246
        c.delSgQueue.ShutDown()
×
1247
        c.syncSgPortsQueue.ShutDown()
×
1248

×
1249
        c.addOrUpdateCsrQueue.ShutDown()
×
1250

×
1251
        if c.config.EnableLiveMigrationOptimize {
×
1252
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1253
        }
×
1254
}
1255

1256
func (c *Controller) startWorkers(ctx context.Context) {
×
1257
        klog.Info("Starting workers")
×
1258

×
1259
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1260
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1261
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1262

×
1263
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1264
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1265
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1266
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1267
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1268
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1269
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1270
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1272
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1273
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1274
        // add default and join subnet and wait them ready
×
1275
        for range c.config.WorkerNum {
×
1276
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1277
        }
×
1278
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1279
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1280
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1281
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1282
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1283
                klog.Infof("wait for subnets %v ready", subnets)
×
1284

×
1285
                return c.allSubnetReady(subnets...)
×
1286
        })
×
1287
        if err != nil {
×
1288
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1289
        }
×
1290

1291
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1292
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1293
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1294

×
1295
        // run node worker before handle any pods
×
1296
        for range c.config.WorkerNum {
×
1297
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1298
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1299
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1300
        }
×
1301
        for {
×
1302
                ready := true
×
1303
                time.Sleep(3 * time.Second)
×
1304
                nodes, err := c.nodesLister.List(labels.Everything())
×
1305
                if err != nil {
×
1306
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1307
                }
×
1308
                for _, node := range nodes {
×
1309
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1310
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1311
                                ready = false
×
1312
                                break
×
1313
                        }
1314
                }
1315
                if ready {
×
1316
                        break
×
1317
                }
1318
        }
1319

1320
        if c.config.EnableLb {
×
1321
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1322
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1323
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1324

×
1325
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1326
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1327
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1328

×
1329
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1330
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1331
                go wait.Until(func() {
×
1332
                        c.resyncVpcDNSConfig()
×
1333
                }, 5*time.Second, ctx.Done())
×
1334
        }
1335

1336
        for range c.config.WorkerNum {
×
1337
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1338
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1339
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1340

×
1341
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1342
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1343
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1344
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1345
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1346

×
1347
                if c.config.EnableLb {
×
1348
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1349
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1350
                }
×
1351

1352
                if c.config.EnableNP {
×
1353
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1354
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1355
                }
×
1356

1357
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1358
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1359
        }
1360

1361
        if c.config.EnableEipSnat {
×
1362
                go wait.Until(func() {
×
1363
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1364
                        c.resyncExternalGateway()
×
1365
                }, time.Second, ctx.Done())
×
1366

1367
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1368
                c.OVNNbClient.MonitorBFD()
×
1369
        }
1370
        // TODO: we should merge these two vpc nat config into one config and resync them together
1371
        go wait.Until(func() {
×
1372
                c.resyncVpcNatGwConfig()
×
1373
        }, time.Second, ctx.Done())
×
1374

1375
        go wait.Until(func() {
×
1376
                c.resyncVpcNatConfig()
×
1377
        }, time.Second, ctx.Done())
×
1378

1379
        if c.config.GCInterval != 0 {
×
1380
                go wait.Until(func() {
×
1381
                        if err := c.markAndCleanLSP(); err != nil {
×
1382
                                klog.Errorf("gc lsp error: %v", err)
×
1383
                        }
×
1384
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1385
        }
1386

1387
        go wait.Until(func() {
×
1388
                if err := c.inspectPod(); err != nil {
×
1389
                        klog.Errorf("inspection error: %v", err)
×
1390
                }
×
1391
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1392

1393
        if c.config.EnableExternalVpc {
×
1394
                go wait.Until(func() {
×
1395
                        c.syncExternalVpc()
×
1396
                }, 5*time.Second, ctx.Done())
×
1397
        }
1398

1399
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1400
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1401
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1402
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1403

×
1404
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1405
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1406
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1407
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1408

×
1409
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1410
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1411
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1412

×
1413
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1414
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1415
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1416

×
1417
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1418
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1419
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1420

×
1421
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1422

×
1423
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1424
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1425
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1426

×
1427
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1428
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1429
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1430
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1431

×
1432
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1433
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1434
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1435
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1436

×
1437
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1438
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1439
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1440

×
1441
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1442
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1443
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1444

×
1445
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1446
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1447
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1448

×
1449
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1450
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1451
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1452

×
1453
        if c.config.EnableANP {
×
1454
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1455
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1456
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1457

×
1458
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1459
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1460
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1461

×
1462
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1463
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1464
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1465
        }
×
1466

1467
        if c.config.EnableDNSNameResolver {
×
1468
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1469
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1470
        }
×
1471

1472
        if c.config.EnableLiveMigrationOptimize {
×
1473
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1474
        }
×
1475

1476
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1477

×
1478
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1479
}
1480

1481
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1482
        for _, lsName := range subnets {
2✔
1483
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1484
                if err != nil {
1✔
1485
                        klog.Error(err)
×
1486
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1487
                }
×
1488

1489
                if !exist {
2✔
1490
                        return false, nil
1✔
1491
                }
1✔
1492
        }
1493

1494
        return true, nil
1✔
1495
}
1496

1497
func (c *Controller) initResourceOnce() {
×
1498
        c.registerSubnetMetrics()
×
1499

×
1500
        if err := c.initNodeChassis(); err != nil {
×
1501
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1502
        }
×
1503

1504
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1505
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1506
        }
×
1507
        if err := c.syncSecurityGroup(); err != nil {
×
1508
                util.LogFatalAndExit(err, "failed to sync security group")
×
1509
        }
×
1510

1511
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1512
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1513
        }
×
1514

1515
        if err := c.initVpcNatGw(); err != nil {
×
1516
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1517
        }
×
1518
        if c.config.EnableLb {
×
1519
                if err := c.initVpcDNSConfig(); err != nil {
×
1520
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1521
                }
×
1522
        }
1523

1524
        // remove resources in ovndb that not exist any more in kubernetes resources
1525
        // process gc at last in case of affecting other init process
1526
        if err := c.gc(); err != nil {
×
1527
                util.LogFatalAndExit(err, "failed to run gc")
×
1528
        }
×
1529
}
1530

1531
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1532
        item, shutdown := queue.Get()
×
1533
        if shutdown {
×
1534
                return false
×
1535
        }
×
1536

1537
        err := func(item T) error {
×
1538
                defer queue.Done(item)
×
1539
                if err := handler(item); err != nil {
×
1540
                        queue.AddRateLimited(item)
×
1541
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1542
                }
×
1543
                queue.Forget(item)
×
1544
                return nil
×
1545
        }(item)
1546
        if err != nil {
×
1547
                utilruntime.HandleError(err)
×
1548
                return true
×
1549
        }
×
1550
        return true
×
1551
}
1552

1553
func getWorkItemKey(obj any) string {
×
1554
        switch v := obj.(type) {
×
1555
        case string:
×
1556
                return v
×
1557
        case *vpcService:
×
1558
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1559
        case *AdminNetworkPolicyChangedDelta:
×
1560
                return v.key
×
1561
        case *SlrInfo:
×
1562
                return v.Name
×
1563
        default:
×
1564
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1565
                if err != nil {
×
1566
                        utilruntime.HandleError(err)
×
1567
                        return ""
×
1568
                }
×
1569
                return key
×
1570
        }
1571
}
1572

1573
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1574
        return func() {
×
1575
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1576
                }
×
1577
        }
1578
}
1579

1580
// apiResourceExists checks if all specified kinds exist in the given group version.
1581
// It returns true if all kinds are found, false otherwise.
1582
// Parameters:
1583
// - discoveryClient: The discovery client to use for querying API resources.
1584
// - gv: The group version string (e.g., "apps/v1").
1585
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
NEW
1586
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
NEW
1587
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
NEW
1588
        if err != nil {
×
NEW
1589
                if k8serrors.IsNotFound(err) {
×
NEW
1590
                        return false, nil
×
NEW
1591
                }
×
NEW
1592
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1593
        }
1594

NEW
1595
        existingKinds := set.New[string]()
×
NEW
1596
        for _, apiResource := range apiResourceLists.APIResources {
×
NEW
1597
                existingKinds.Insert(apiResource.Kind)
×
NEW
1598
        }
×
1599

NEW
1600
        return existingKinds.HasAll(kinds...), nil
×
1601
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc