• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 16460561632

23 Jul 2025 03:08AM UTC coverage: 21.431% (-0.002%) from 21.433%
16460561632

push

github

web-flow
fix concurrent map race (#5510)

* [fix] use map to replace  the informer cache modify

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>

* [fix] change of conditons

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>

* [fix] clean up cached entry

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>

---------

Signed-off-by: zhanglin02 <zhanglin02@kanzhun.com>
Co-authored-by: zhanglin02 <zhanglin02@kanzhun.com>

0 of 15 new or added lines in 3 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

10540 of 49180 relevant lines covered (21.43%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.23
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
11
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
12
        "github.com/puzpuzpuz/xsync/v4"
13
        "golang.org/x/time/rate"
14
        corev1 "k8s.io/api/core/v1"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
18
        "k8s.io/apimachinery/pkg/util/wait"
19
        kubeinformers "k8s.io/client-go/informers"
20
        "k8s.io/client-go/kubernetes/scheme"
21
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
22
        appsv1 "k8s.io/client-go/listers/apps/v1"
23
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
24
        v1 "k8s.io/client-go/listers/core/v1"
25
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
26
        netv1 "k8s.io/client-go/listers/networking/v1"
27
        "k8s.io/client-go/tools/cache"
28
        "k8s.io/client-go/tools/record"
29
        "k8s.io/client-go/util/workqueue"
30
        "k8s.io/klog/v2"
31
        "k8s.io/utils/keymutex"
32
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
33
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
34
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
35

36
        "github.com/kubeovn/kube-ovn/pkg/informer"
37

38
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
39
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
40
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
41
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
42
        "github.com/kubeovn/kube-ovn/pkg/ovs"
43
        "github.com/kubeovn/kube-ovn/pkg/util"
44
)
45

46
const controllerAgentName = "kube-ovn-controller"
47

48
const (
49
        logicalSwitchKey              = "ls"
50
        logicalRouterKey              = "lr"
51
        portGroupKey                  = "pg"
52
        networkPolicyKey              = "np"
53
        sgKey                         = "sg"
54
        associatedSgKeyPrefix         = "associated_sg_"
55
        sgsKey                        = "security_groups"
56
        u2oKey                        = "u2o"
57
        adminNetworkPolicyKey         = "anp"
58
        baselineAdminNetworkPolicyKey = "banp"
59
)
60

61
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
62
type Controller struct {
63
        config *Configuration
64

65
        ipam           *ovnipam.IPAM
66
        namedPort      *NamedPort
67
        anpPrioNameMap map[int32]string
68
        anpNamePrioMap map[string]int32
69

70
        OVNNbClient ovs.NbClient
71
        OVNSbClient ovs.SbClient
72

73
        // ExternalGatewayType define external gateway type, centralized
74
        ExternalGatewayType string
75

76
        podsLister             v1.PodLister
77
        podsSynced             cache.InformerSynced
78
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
79
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
80
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
81
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
82
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
83
        podKeyMutex            keymutex.KeyMutex
84

85
        vpcsLister           kubeovnlister.VpcLister
86
        vpcSynced            cache.InformerSynced
87
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
88
        vpcLastPoliciesMap   *xsync.Map[string, string]
89
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
90
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
91
        vpcKeyMutex          keymutex.KeyMutex
92

93
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
94
        vpcNatGatewaySynced           cache.InformerSynced
95
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
96
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
97
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
98
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
99
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
100
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
101
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
102
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
103
        vpcNatGwKeyMutex              keymutex.KeyMutex
104

105
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
106
        vpcEgressGatewaySynced           cache.InformerSynced
107
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
108
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
109
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
110

111
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
112
        switchLBRuleSynced      cache.InformerSynced
113
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
114
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
115
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
116

117
        vpcDNSLister           kubeovnlister.VpcDnsLister
118
        vpcDNSSynced           cache.InformerSynced
119
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
120
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
121

122
        subnetsLister           kubeovnlister.SubnetLister
123
        subnetSynced            cache.InformerSynced
124
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
125
        subnetLastVpcNameMap    *xsync.Map[string, string]
126
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
127
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
128
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
129
        subnetKeyMutex          keymutex.KeyMutex
130

131
        ippoolLister            kubeovnlister.IPPoolLister
132
        ippoolSynced            cache.InformerSynced
133
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
134
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
135
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
136
        ippoolKeyMutex          keymutex.KeyMutex
137

138
        ipsLister     kubeovnlister.IPLister
139
        ipSynced      cache.InformerSynced
140
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
141
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
142
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
143

144
        virtualIpsLister          kubeovnlister.VipLister
145
        virtualIpsSynced          cache.InformerSynced
146
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
147
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
148
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
149
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
150

151
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
152
        iptablesEipSynced      cache.InformerSynced
153
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
154
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
155
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
156
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
157

158
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
159
        iptablesFipSynced      cache.InformerSynced
160
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
161
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
162
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
163

164
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
165
        iptablesDnatRuleSynced      cache.InformerSynced
166
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
167
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
168
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
169

170
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
171
        iptablesSnatRuleSynced      cache.InformerSynced
172
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
173
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
174
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
175

176
        ovnEipsLister     kubeovnlister.OvnEipLister
177
        ovnEipSynced      cache.InformerSynced
178
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
179
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
180
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
181
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
182

183
        ovnFipsLister     kubeovnlister.OvnFipLister
184
        ovnFipSynced      cache.InformerSynced
185
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
186
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
187
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
188

189
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
190
        ovnSnatRuleSynced      cache.InformerSynced
191
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
193
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194

195
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
196
        ovnDnatRuleSynced      cache.InformerSynced
197
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
198
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
199
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
200

201
        providerNetworksLister kubeovnlister.ProviderNetworkLister
202
        providerNetworkSynced  cache.InformerSynced
203

204
        vlansLister     kubeovnlister.VlanLister
205
        vlanSynced      cache.InformerSynced
206
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
207
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
208
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
209
        vlanKeyMutex    keymutex.KeyMutex
210

211
        namespacesLister  v1.NamespaceLister
212
        namespacesSynced  cache.InformerSynced
213
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
214
        nsKeyMutex        keymutex.KeyMutex
215

216
        nodesLister     v1.NodeLister
217
        nodesSynced     cache.InformerSynced
218
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
219
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
220
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
221
        nodeKeyMutex    keymutex.KeyMutex
222

223
        servicesLister     v1.ServiceLister
224
        serviceSynced      cache.InformerSynced
225
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
226
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
227
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
228
        svcKeyMutex        keymutex.KeyMutex
229

230
        endpointSlicesLister          discoveryv1.EndpointSliceLister
231
        endpointSlicesSynced          cache.InformerSynced
232
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
233
        epKeyMutex                    keymutex.KeyMutex
234

235
        deploymentsLister appsv1.DeploymentLister
236
        deploymentsSynced cache.InformerSynced
237

238
        npsLister     netv1.NetworkPolicyLister
239
        npsSynced     cache.InformerSynced
240
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
241
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
242
        npKeyMutex    keymutex.KeyMutex
243

244
        sgsLister          kubeovnlister.SecurityGroupLister
245
        sgSynced           cache.InformerSynced
246
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
247
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
248
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
249
        sgKeyMutex         keymutex.KeyMutex
250

251
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
252
        qosPolicySynced      cache.InformerSynced
253
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
254
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
255
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
256

257
        configMapsLister v1.ConfigMapLister
258
        configMapsSynced cache.InformerSynced
259

260
        anpsLister     anplister.AdminNetworkPolicyLister
261
        anpsSynced     cache.InformerSynced
262
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
263
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
264
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
265
        anpKeyMutex    keymutex.KeyMutex
266

267
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
268
        banpsSynced     cache.InformerSynced
269
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
270
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
271
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
272
        banpKeyMutex    keymutex.KeyMutex
273

274
        csrLister           certListerv1.CertificateSigningRequestLister
275
        csrSynced           cache.InformerSynced
276
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
277

278
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
279
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
280
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
281

282
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
283
        netAttachSynced          cache.InformerSynced
284
        netAttachInformerFactory netAttach.SharedInformerFactory
285

286
        recorder               record.EventRecorder
287
        informerFactory        kubeinformers.SharedInformerFactory
288
        cmInformerFactory      kubeinformers.SharedInformerFactory
289
        deployInformerFactory  kubeinformers.SharedInformerFactory
290
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
291
        anpInformerFactory     anpinformer.SharedInformerFactory
292

293
        // Database health check
294
        dbFailureCount int
295
}
296

297
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
298
        if rateLimiter == nil {
2✔
299
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
300
        }
1✔
301
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
302
}
303

304
// Run creates and runs a new ovn controller
305
func Run(ctx context.Context, config *Configuration) {
×
306
        klog.V(4).Info("Creating event broadcaster")
×
307
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
308
        eventBroadcaster.StartLogging(klog.Infof)
×
309
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
310
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
311
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
312
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
313
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
314
        )
×
315

×
316
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
317
        if err != nil {
×
318
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
319
        }
×
320

321
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
322
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
323
                        listOption.AllowWatchBookmarks = true
×
324
                }))
×
325
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
326
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
327
                        listOption.AllowWatchBookmarks = true
×
328
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
329
        // deployment informer used to list/watch vpc egress gateway workloads
330
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
331
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
332
                        listOption.AllowWatchBookmarks = true
×
333
                        listOption.LabelSelector = selector.String()
×
334
                }))
×
335
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
336
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
337
                        listOption.AllowWatchBookmarks = true
×
338
                }))
×
339
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
340
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
341
                        listOption.AllowWatchBookmarks = true
×
342
                }))
×
343

344
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
345
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
346
                        listOption.AllowWatchBookmarks = true
×
347
                }),
×
348
        )
349

350
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
351

×
352
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
353
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
354
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
355
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
356
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
357
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
358
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
359
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
360
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
361
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
362
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
363
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
364
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
365
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
366
        podInformer := informerFactory.Core().V1().Pods()
×
367
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
368
        nodeInformer := informerFactory.Core().V1().Nodes()
×
369
        serviceInformer := informerFactory.Core().V1().Services()
×
370
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
371
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
372
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
373
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
374
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
375
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
376
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
377
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
378
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
379
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
380
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
381
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
382
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
383
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
384
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
385

×
386
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
387
        controller := &Controller{
×
388
                config:             config,
×
389
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
390
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
391
                ipam:               ovnipam.NewIPAM(),
×
392
                namedPort:          NewNamedPort(),
×
393

×
394
                vpcsLister:           vpcInformer.Lister(),
×
395
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
396
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
NEW
397
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
398
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
399
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
400
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
401

×
402
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
403
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
404
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
405
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
406
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
407
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
408
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
409
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
410
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
411
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
412
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
413

×
414
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
415
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
416
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
417
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
418
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
419

×
420
                subnetsLister:           subnetInformer.Lister(),
×
421
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
422
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
NEW
423
                subnetLastVpcNameMap:    xsync.NewMap[string, string](),
×
424
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
425
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
426
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
427
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
428

×
429
                ippoolLister:            ippoolInformer.Lister(),
×
430
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
431
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
432
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
433
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
434
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
435

×
436
                ipsLister:     ipInformer.Lister(),
×
437
                ipSynced:      ipInformer.Informer().HasSynced,
×
438
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
439
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
440
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
441

×
442
                virtualIpsLister:          virtualIPInformer.Lister(),
×
443
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
444
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
445
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
446
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
447
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
448

×
449
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
450
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
451
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
452
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
453
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
454
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
455

×
456
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
457
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
458
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
459
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
460
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
461

×
462
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
463
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
464
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
465
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
466
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
467

×
468
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
469
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
470
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
471
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
472
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
473

×
474
                vlansLister:     vlanInformer.Lister(),
×
475
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
476
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
477
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
478
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
479
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
480

×
481
                providerNetworksLister: providerNetworkInformer.Lister(),
×
482
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
483

×
484
                podsLister:          podInformer.Lister(),
×
485
                podsSynced:          podInformer.Informer().HasSynced,
×
486
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
487
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
488
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
489
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
490
                                Name:          "DeletePod",
×
491
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
492
                        },
×
493
                ),
×
494
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
495
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
496

×
497
                namespacesLister:  namespaceInformer.Lister(),
×
498
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
499
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
500
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
501

×
502
                nodesLister:     nodeInformer.Lister(),
×
503
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
504
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
505
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
506
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
507
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
508

×
509
                servicesLister:     serviceInformer.Lister(),
×
510
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
511
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
512
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
513
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
514
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
515

×
516
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
517
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
518
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
519
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
520

×
521
                deploymentsLister: deploymentInformer.Lister(),
×
522
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
523

×
524
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
525
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
526
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
527
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
528
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
529

×
530
                configMapsLister: configMapInformer.Lister(),
×
531
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
532

×
533
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
534
                sgsLister:          sgInformer.Lister(),
×
535
                sgSynced:           sgInformer.Informer().HasSynced,
×
536
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
537
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
538
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
539

×
540
                ovnEipsLister:     ovnEipInformer.Lister(),
×
541
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
542
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
543
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
544
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
545
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
546

×
547
                ovnFipsLister:     ovnFipInformer.Lister(),
×
548
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
549
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
550
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
551
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
552

×
553
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
554
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
555
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
556
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
557
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
558

×
559
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
560
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
561
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
562
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
563
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
564

×
565
                csrLister:           csrInformer.Lister(),
×
566
                csrSynced:           csrInformer.Informer().HasSynced,
×
567
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
568

×
569
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
570
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
571
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
572

×
573
                netAttachLister:          netAttachInformer.Lister(),
×
574
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
575
                netAttachInformerFactory: attachNetInformerFactory,
×
576

×
577
                recorder:               recorder,
×
578
                informerFactory:        informerFactory,
×
579
                cmInformerFactory:      cmInformerFactory,
×
580
                deployInformerFactory:  deployInformerFactory,
×
581
                kubeovnInformerFactory: kubeovnInformerFactory,
×
582
                anpInformerFactory:     anpInformerFactory,
×
583
        }
×
584

×
585
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
586
                config.OvnNbAddr,
×
587
                config.OvnTimeout,
×
588
                config.OvsDbConnectTimeout,
×
589
                config.OvsDbInactivityTimeout,
×
590
                config.OvsDbConnectMaxRetry,
×
591
        ); err != nil {
×
592
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
593
        }
×
594
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
595
                config.OvnSbAddr,
×
596
                config.OvnTimeout,
×
597
                config.OvsDbConnectTimeout,
×
598
                config.OvsDbInactivityTimeout,
×
599
                config.OvsDbConnectMaxRetry,
×
600
        ); err != nil {
×
601
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
602
        }
×
603
        if config.EnableLb {
×
604
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
605
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
606
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
607
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
608
                        "DeleteSwitchLBRule",
×
609
                        workqueue.NewTypedMaxOfRateLimiter(
×
610
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
611
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
612
                        ),
×
613
                )
×
614
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
615
                        "UpdateSwitchLBRule",
×
616
                        workqueue.NewTypedMaxOfRateLimiter(
×
617
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
618
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
619
                        ),
×
620
                )
×
621

×
622
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
623
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
624
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
625
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
626
        }
×
627

628
        if config.EnableNP {
×
629
                controller.npsLister = npInformer.Lister()
×
630
                controller.npsSynced = npInformer.Informer().HasSynced
×
631
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
632
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
633
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
634
        }
×
635

636
        if config.EnableANP {
×
637
                controller.anpsLister = anpInformer.Lister()
×
638
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
639
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
640
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
641
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
642
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
643

×
644
                controller.banpsLister = banpInformer.Lister()
×
645
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
646
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
647
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
648
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
649
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
650
        }
×
651

652
        defer controller.shutdown()
×
653
        klog.Info("Starting OVN controller")
×
654

×
655
        // Wait for the caches to be synced before starting workers
×
656
        controller.informerFactory.Start(ctx.Done())
×
657
        controller.cmInformerFactory.Start(ctx.Done())
×
658
        controller.deployInformerFactory.Start(ctx.Done())
×
659
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
660
        controller.anpInformerFactory.Start(ctx.Done())
×
661
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
662
        controller.StartNetAttachInformerFactory(ctx)
×
663

×
664
        klog.Info("Waiting for informer caches to sync")
×
665
        cacheSyncs := []cache.InformerSynced{
×
666
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
667
                controller.vpcSynced, controller.subnetSynced,
×
668
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
669
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
670
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
671
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
672
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
673
                controller.ovnDnatRuleSynced,
×
674
        }
×
675
        if controller.config.EnableLb {
×
676
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
677
        }
×
678
        if controller.config.EnableNP {
×
679
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
680
        }
×
681
        if controller.config.EnableANP {
×
682
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
683
        }
×
684

685
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
686
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
687
        }
×
688

689
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
690
                AddFunc:    controller.enqueueAddPod,
×
691
                DeleteFunc: controller.enqueueDeletePod,
×
692
                UpdateFunc: controller.enqueueUpdatePod,
×
693
        }); err != nil {
×
694
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
695
        }
×
696

697
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
698
                AddFunc:    controller.enqueueAddNamespace,
×
699
                UpdateFunc: controller.enqueueUpdateNamespace,
×
700
                DeleteFunc: controller.enqueueDeleteNamespace,
×
701
        }); err != nil {
×
702
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
703
        }
×
704

705
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
706
                AddFunc:    controller.enqueueAddNode,
×
707
                UpdateFunc: controller.enqueueUpdateNode,
×
708
                DeleteFunc: controller.enqueueDeleteNode,
×
709
        }); err != nil {
×
710
                util.LogFatalAndExit(err, "failed to add node event handler")
×
711
        }
×
712

713
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
714
                AddFunc:    controller.enqueueAddService,
×
715
                DeleteFunc: controller.enqueueDeleteService,
×
716
                UpdateFunc: controller.enqueueUpdateService,
×
717
        }); err != nil {
×
718
                util.LogFatalAndExit(err, "failed to add service event handler")
×
719
        }
×
720

721
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
722
                AddFunc:    controller.enqueueAddEndpointSlice,
×
723
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
724
        }); err != nil {
×
725
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
726
        }
×
727

728
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
729
                AddFunc:    controller.enqueueAddDeployment,
×
730
                UpdateFunc: controller.enqueueUpdateDeployment,
×
731
        }); err != nil {
×
732
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
733
        }
×
734

735
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
736
                AddFunc:    controller.enqueueAddVpc,
×
737
                UpdateFunc: controller.enqueueUpdateVpc,
×
738
                DeleteFunc: controller.enqueueDelVpc,
×
739
        }); err != nil {
×
740
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
741
        }
×
742

743
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
744
                AddFunc:    controller.enqueueAddVpcNatGw,
×
745
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
746
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
747
        }); err != nil {
×
748
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
749
        }
×
750

751
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
752
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
753
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
754
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
755
        }); err != nil {
×
756
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
757
        }
×
758

759
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
760
                AddFunc:    controller.enqueueAddSubnet,
×
761
                UpdateFunc: controller.enqueueUpdateSubnet,
×
762
                DeleteFunc: controller.enqueueDeleteSubnet,
×
763
        }); err != nil {
×
764
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
765
        }
×
766

767
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
768
                AddFunc:    controller.enqueueAddIPPool,
×
769
                UpdateFunc: controller.enqueueUpdateIPPool,
×
770
                DeleteFunc: controller.enqueueDeleteIPPool,
×
771
        }); err != nil {
×
772
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
773
        }
×
774

775
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
776
                AddFunc:    controller.enqueueAddIP,
×
777
                UpdateFunc: controller.enqueueUpdateIP,
×
778
                DeleteFunc: controller.enqueueDelIP,
×
779
        }); err != nil {
×
780
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
781
        }
×
782

783
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
784
                AddFunc:    controller.enqueueAddVlan,
×
785
                DeleteFunc: controller.enqueueDelVlan,
×
786
                UpdateFunc: controller.enqueueUpdateVlan,
×
787
        }); err != nil {
×
788
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
789
        }
×
790

791
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
792
                AddFunc:    controller.enqueueAddSg,
×
793
                DeleteFunc: controller.enqueueDeleteSg,
×
794
                UpdateFunc: controller.enqueueUpdateSg,
×
795
        }); err != nil {
×
796
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
797
        }
×
798

799
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
800
                AddFunc:    controller.enqueueAddVirtualIP,
×
801
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
802
                DeleteFunc: controller.enqueueDelVirtualIP,
×
803
        }); err != nil {
×
804
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
805
        }
×
806

807
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
808
                AddFunc:    controller.enqueueAddIptablesEip,
×
809
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
810
                DeleteFunc: controller.enqueueDelIptablesEip,
×
811
        }); err != nil {
×
812
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
813
        }
×
814

815
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
816
                AddFunc:    controller.enqueueAddIptablesFip,
×
817
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
818
                DeleteFunc: controller.enqueueDelIptablesFip,
×
819
        }); err != nil {
×
820
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
821
        }
×
822

823
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
824
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
825
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
826
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
827
        }); err != nil {
×
828
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
829
        }
×
830

831
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
832
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
833
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
834
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
835
        }); err != nil {
×
836
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
837
        }
×
838

839
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
840
                AddFunc:    controller.enqueueAddOvnEip,
×
841
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
842
                DeleteFunc: controller.enqueueDelOvnEip,
×
843
        }); err != nil {
×
844
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
845
        }
×
846

847
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
848
                AddFunc:    controller.enqueueAddOvnFip,
×
849
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
850
                DeleteFunc: controller.enqueueDelOvnFip,
×
851
        }); err != nil {
×
852
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
853
        }
×
854

855
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
856
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
857
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
858
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
859
        }); err != nil {
×
860
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
861
        }
×
862

863
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
864
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
865
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
866
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
867
        }); err != nil {
×
868
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
869
        }
×
870

871
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
872
                AddFunc:    controller.enqueueAddQoSPolicy,
×
873
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
874
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
875
        }); err != nil {
×
876
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
877
        }
×
878

879
        if config.EnableLb {
×
880
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
882
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
883
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
884
                }); err != nil {
×
885
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
886
                }
×
887

888
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
889
                        AddFunc:    controller.enqueueAddVpcDNS,
×
890
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
891
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
892
                }); err != nil {
×
893
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
894
                }
×
895
        }
896

897
        if config.EnableNP {
×
898
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
899
                        AddFunc:    controller.enqueueAddNp,
×
900
                        UpdateFunc: controller.enqueueUpdateNp,
×
901
                        DeleteFunc: controller.enqueueDeleteNp,
×
902
                }); err != nil {
×
903
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
904
                }
×
905
        }
906

907
        if config.EnableANP {
×
908
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
909
                        AddFunc:    controller.enqueueAddAnp,
×
910
                        UpdateFunc: controller.enqueueUpdateAnp,
×
911
                        DeleteFunc: controller.enqueueDeleteAnp,
×
912
                }); err != nil {
×
913
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
914
                }
×
915

916
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
917
                        AddFunc:    controller.enqueueAddBanp,
×
918
                        UpdateFunc: controller.enqueueUpdateBanp,
×
919
                        DeleteFunc: controller.enqueueDeleteBanp,
×
920
                }); err != nil {
×
921
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
922
                }
×
923

924
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
925
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
926
        }
927

928
        if config.EnableOVNIPSec {
×
929
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
930
                        AddFunc:    controller.enqueueAddCsr,
×
931
                        UpdateFunc: controller.enqueueUpdateCsr,
×
932
                        // no need to add delete func for csr
×
933
                }); err != nil {
×
934
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
935
                }
×
936
        }
937

938
        controller.Run(ctx)
×
939
}
940

941
// Run will set up the event handlers for types we are interested in, as well
942
// as syncing informer caches and starting workers. It will block until stopCh
943
// is closed, at which point it will shutdown the workqueue and wait for
944
// workers to finish processing their current work items.
945
func (c *Controller) Run(ctx context.Context) {
×
946
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
947
        // Otherwise, the init process should be placed after all workers have already started working
×
948
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
949
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
950
        }
×
951

952
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
953
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
954
        }
×
955

956
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
957
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
958
        }
×
959

960
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
961
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
962
        }
×
963

964
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
965
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
966
        }
×
967

968
        if err := c.InitOVN(); err != nil {
×
969
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
970
        }
×
971

972
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
973
        if err := c.syncIPCR(); err != nil {
×
974
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
975
        }
×
976

977
        if err := c.syncFinalizers(); err != nil {
×
978
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
979
        }
×
980

981
        if err := c.InitIPAM(); err != nil {
×
982
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
983
        }
×
984

985
        if err := c.syncNodeRoutes(); err != nil {
×
986
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
987
        }
×
988

989
        if err := c.syncSubnetCR(); err != nil {
×
990
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
991
        }
×
992

993
        if err := c.syncVlanCR(); err != nil {
×
994
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
995
        }
×
996

997
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
998
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
999
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1000
                }
×
1001
        }
1002

1003
        // start workers to do all the network operations
1004
        c.startWorkers(ctx)
×
1005

×
1006
        c.initResourceOnce()
×
1007
        <-ctx.Done()
×
1008
        klog.Info("Shutting down workers")
×
1009
}
1010

1011
func (c *Controller) dbStatus() {
×
1012
        const maxFailures = 5
×
1013

×
1014
        done := make(chan error, 2)
×
1015
        go func() {
×
1016
                done <- c.OVNNbClient.Echo(context.Background())
×
1017
        }()
×
1018
        go func() {
×
1019
                done <- c.OVNSbClient.Echo(context.Background())
×
1020
        }()
×
1021

1022
        resultsReceived := 0
×
1023
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1024

×
1025
        for resultsReceived < 2 {
×
1026
                select {
×
1027
                case err := <-done:
×
1028
                        resultsReceived++
×
1029
                        if err != nil {
×
1030
                                c.dbFailureCount++
×
1031
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1032
                                if c.dbFailureCount >= maxFailures {
×
1033
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1034
                                }
×
1035
                                return
×
1036
                        }
1037
                case <-timeout:
×
1038
                        c.dbFailureCount++
×
1039
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1040
                        if c.dbFailureCount >= maxFailures {
×
1041
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1042
                        }
×
1043
                        return
×
1044
                }
1045
        }
1046

1047
        if c.dbFailureCount > 0 {
×
1048
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1049
                c.dbFailureCount = 0
×
1050
        }
×
1051
}
1052

1053
func (c *Controller) shutdown() {
×
1054
        utilruntime.HandleCrash()
×
1055

×
1056
        c.addOrUpdatePodQueue.ShutDown()
×
1057
        c.deletePodQueue.ShutDown()
×
1058
        c.updatePodSecurityQueue.ShutDown()
×
1059

×
1060
        c.addNamespaceQueue.ShutDown()
×
1061

×
1062
        c.addOrUpdateSubnetQueue.ShutDown()
×
1063
        c.deleteSubnetQueue.ShutDown()
×
1064
        c.updateSubnetStatusQueue.ShutDown()
×
1065
        c.syncVirtualPortsQueue.ShutDown()
×
1066

×
1067
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1068
        c.updateIPPoolStatusQueue.ShutDown()
×
1069
        c.deleteIPPoolQueue.ShutDown()
×
1070

×
1071
        c.addNodeQueue.ShutDown()
×
1072
        c.updateNodeQueue.ShutDown()
×
1073
        c.deleteNodeQueue.ShutDown()
×
1074

×
1075
        c.addServiceQueue.ShutDown()
×
1076
        c.deleteServiceQueue.ShutDown()
×
1077
        c.updateServiceQueue.ShutDown()
×
1078
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1079

×
1080
        c.addVlanQueue.ShutDown()
×
1081
        c.delVlanQueue.ShutDown()
×
1082
        c.updateVlanQueue.ShutDown()
×
1083

×
1084
        c.addOrUpdateVpcQueue.ShutDown()
×
1085
        c.updateVpcStatusQueue.ShutDown()
×
1086
        c.delVpcQueue.ShutDown()
×
1087

×
1088
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1089
        c.initVpcNatGatewayQueue.ShutDown()
×
1090
        c.delVpcNatGatewayQueue.ShutDown()
×
1091
        c.updateVpcEipQueue.ShutDown()
×
1092
        c.updateVpcFloatingIPQueue.ShutDown()
×
1093
        c.updateVpcDnatQueue.ShutDown()
×
1094
        c.updateVpcSnatQueue.ShutDown()
×
1095
        c.updateVpcSubnetQueue.ShutDown()
×
1096

×
1097
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1098
        c.delVpcEgressGatewayQueue.ShutDown()
×
1099

×
1100
        if c.config.EnableLb {
×
1101
                c.addSwitchLBRuleQueue.ShutDown()
×
1102
                c.delSwitchLBRuleQueue.ShutDown()
×
1103
                c.updateSwitchLBRuleQueue.ShutDown()
×
1104

×
1105
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1106
                c.delVpcDNSQueue.ShutDown()
×
1107
        }
×
1108

1109
        c.addIPQueue.ShutDown()
×
1110
        c.updateIPQueue.ShutDown()
×
1111
        c.delIPQueue.ShutDown()
×
1112

×
1113
        c.addVirtualIPQueue.ShutDown()
×
1114
        c.updateVirtualIPQueue.ShutDown()
×
1115
        c.updateVirtualParentsQueue.ShutDown()
×
1116
        c.delVirtualIPQueue.ShutDown()
×
1117

×
1118
        c.addIptablesEipQueue.ShutDown()
×
1119
        c.updateIptablesEipQueue.ShutDown()
×
1120
        c.resetIptablesEipQueue.ShutDown()
×
1121
        c.delIptablesEipQueue.ShutDown()
×
1122

×
1123
        c.addIptablesFipQueue.ShutDown()
×
1124
        c.updateIptablesFipQueue.ShutDown()
×
1125
        c.delIptablesFipQueue.ShutDown()
×
1126

×
1127
        c.addIptablesDnatRuleQueue.ShutDown()
×
1128
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1129
        c.delIptablesDnatRuleQueue.ShutDown()
×
1130

×
1131
        c.addIptablesSnatRuleQueue.ShutDown()
×
1132
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1133
        c.delIptablesSnatRuleQueue.ShutDown()
×
1134

×
1135
        c.addQoSPolicyQueue.ShutDown()
×
1136
        c.updateQoSPolicyQueue.ShutDown()
×
1137
        c.delQoSPolicyQueue.ShutDown()
×
1138

×
1139
        c.addOvnEipQueue.ShutDown()
×
1140
        c.updateOvnEipQueue.ShutDown()
×
1141
        c.resetOvnEipQueue.ShutDown()
×
1142
        c.delOvnEipQueue.ShutDown()
×
1143

×
1144
        c.addOvnFipQueue.ShutDown()
×
1145
        c.updateOvnFipQueue.ShutDown()
×
1146
        c.delOvnFipQueue.ShutDown()
×
1147

×
1148
        c.addOvnSnatRuleQueue.ShutDown()
×
1149
        c.updateOvnSnatRuleQueue.ShutDown()
×
1150
        c.delOvnSnatRuleQueue.ShutDown()
×
1151

×
1152
        c.addOvnDnatRuleQueue.ShutDown()
×
1153
        c.updateOvnDnatRuleQueue.ShutDown()
×
1154
        c.delOvnDnatRuleQueue.ShutDown()
×
1155

×
1156
        if c.config.EnableNP {
×
1157
                c.updateNpQueue.ShutDown()
×
1158
                c.deleteNpQueue.ShutDown()
×
1159
        }
×
1160
        if c.config.EnableANP {
×
1161
                c.addAnpQueue.ShutDown()
×
1162
                c.updateAnpQueue.ShutDown()
×
1163
                c.deleteAnpQueue.ShutDown()
×
1164

×
1165
                c.addBanpQueue.ShutDown()
×
1166
                c.updateBanpQueue.ShutDown()
×
1167
                c.deleteBanpQueue.ShutDown()
×
1168
        }
×
1169

1170
        c.addOrUpdateSgQueue.ShutDown()
×
1171
        c.delSgQueue.ShutDown()
×
1172
        c.syncSgPortsQueue.ShutDown()
×
1173

×
1174
        c.addOrUpdateCsrQueue.ShutDown()
×
1175

×
1176
        if c.config.EnableLiveMigrationOptimize {
×
1177
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1178
        }
×
1179
}
1180

1181
func (c *Controller) startWorkers(ctx context.Context) {
×
1182
        klog.Info("Starting workers")
×
1183

×
1184
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1185
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1186
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1187

×
1188
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1189
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1190
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1191
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1192
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1193
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1194
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1195
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1196
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1197
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1198
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1199
        // add default and join subnet and wait them ready
×
1200
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1201
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1202
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1203
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1204
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1205
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1206
                klog.Infof("wait for subnets %v ready", subnets)
×
1207

×
1208
                return c.allSubnetReady(subnets...)
×
1209
        })
×
1210
        if err != nil {
×
1211
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1212
        }
×
1213

1214
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1215
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1216
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1217

×
1218
        // run node worker before handle any pods
×
1219
        for range c.config.WorkerNum {
×
1220
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1221
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1222
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1223
        }
×
1224
        for {
×
1225
                ready := true
×
1226
                time.Sleep(3 * time.Second)
×
1227
                nodes, err := c.nodesLister.List(labels.Everything())
×
1228
                if err != nil {
×
1229
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1230
                }
×
1231
                for _, node := range nodes {
×
1232
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1233
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1234
                                ready = false
×
1235
                                break
×
1236
                        }
1237
                }
1238
                if ready {
×
1239
                        break
×
1240
                }
1241
        }
1242

1243
        if c.config.EnableLb {
×
1244
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1245
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1246
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1247

×
1248
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1249
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1250
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1251

×
1252
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1253
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1254
                go wait.Until(func() {
×
1255
                        c.resyncVpcDNSConfig()
×
1256
                }, 5*time.Second, ctx.Done())
×
1257
        }
1258

1259
        for range c.config.WorkerNum {
×
1260
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1261
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1262
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1263

×
1264
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1265
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1266
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1267
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1268
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1269

×
1270
                if c.config.EnableLb {
×
1271
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1272
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1273
                }
×
1274

1275
                if c.config.EnableNP {
×
1276
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1277
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1278
                }
×
1279

1280
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1281
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1282
        }
1283

1284
        if c.config.EnableEipSnat {
×
1285
                go wait.Until(func() {
×
1286
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1287
                        c.resyncExternalGateway()
×
1288
                }, time.Second, ctx.Done())
×
1289

1290
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1291
                c.OVNNbClient.MonitorBFD()
×
1292
        }
1293
        // TODO: we should merge these two vpc nat config into one config and resync them together
1294
        go wait.Until(func() {
×
1295
                c.resyncVpcNatGwConfig()
×
1296
        }, time.Second, ctx.Done())
×
1297

1298
        go wait.Until(func() {
×
1299
                c.resyncVpcNatConfig()
×
1300
        }, time.Second, ctx.Done())
×
1301

1302
        if c.config.GCInterval != 0 {
×
1303
                go wait.Until(func() {
×
1304
                        if err := c.markAndCleanLSP(); err != nil {
×
1305
                                klog.Errorf("gc lsp error: %v", err)
×
1306
                        }
×
1307
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1308
        }
1309

1310
        go wait.Until(func() {
×
1311
                if err := c.inspectPod(); err != nil {
×
1312
                        klog.Errorf("inspection error: %v", err)
×
1313
                }
×
1314
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1315

1316
        if c.config.EnableExternalVpc {
×
1317
                go wait.Until(func() {
×
1318
                        c.syncExternalVpc()
×
1319
                }, 5*time.Second, ctx.Done())
×
1320
        }
1321

1322
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1323
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1324
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1325

×
1326
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1327
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1330

×
1331
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1332
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1333
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1334

×
1335
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1336
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1337
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1338

×
1339
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1340
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1341
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1342

×
1343
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1344

×
1345
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1346
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1347
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1348

×
1349
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1350
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1351
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1352
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1353

×
1354
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1355
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1356
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1357
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1358

×
1359
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1360
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1361
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1362

×
1363
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1364
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1365
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1366

×
1367
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1368
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1369
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1370

×
1371
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1372
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1373
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1374

×
1375
        if c.config.EnableANP {
×
1376
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1377
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1378
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1379

×
1380
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1381
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1382
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1383
        }
×
1384

1385
        if c.config.EnableLiveMigrationOptimize {
×
1386
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1387
        }
×
1388

1389
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1390

×
1391
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1392
}
1393

1394
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1395
        for _, lsName := range subnets {
2✔
1396
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1397
                if err != nil {
1✔
1398
                        klog.Error(err)
×
1399
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1400
                }
×
1401

1402
                if !exist {
2✔
1403
                        return false, nil
1✔
1404
                }
1✔
1405
        }
1406

1407
        return true, nil
1✔
1408
}
1409

1410
func (c *Controller) initResourceOnce() {
×
1411
        c.registerSubnetMetrics()
×
1412

×
1413
        if err := c.initNodeChassis(); err != nil {
×
1414
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1415
        }
×
1416

1417
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1418
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1419
        }
×
1420
        if err := c.syncSecurityGroup(); err != nil {
×
1421
                util.LogFatalAndExit(err, "failed to sync security group")
×
1422
        }
×
1423

1424
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1425
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1426
        }
×
1427

1428
        if err := c.initVpcNatGw(); err != nil {
×
1429
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1430
        }
×
1431
        if c.config.EnableLb {
×
1432
                if err := c.initVpcDNSConfig(); err != nil {
×
1433
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1434
                }
×
1435
        }
1436

1437
        // remove resources in ovndb that not exist any more in kubernetes resources
1438
        // process gc at last in case of affecting other init process
1439
        if err := c.gc(); err != nil {
×
1440
                util.LogFatalAndExit(err, "failed to run gc")
×
1441
        }
×
1442
}
1443

1444
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1445
        item, shutdown := queue.Get()
×
1446
        if shutdown {
×
1447
                return false
×
1448
        }
×
1449

1450
        err := func(item T) error {
×
1451
                defer queue.Done(item)
×
1452
                if err := handler(item); err != nil {
×
1453
                        queue.AddRateLimited(item)
×
1454
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1455
                }
×
1456
                queue.Forget(item)
×
1457
                return nil
×
1458
        }(item)
1459
        if err != nil {
×
1460
                utilruntime.HandleError(err)
×
1461
                return true
×
1462
        }
×
1463
        return true
×
1464
}
1465

1466
func getWorkItemKey(obj any) string {
×
1467
        switch v := obj.(type) {
×
1468
        case string:
×
1469
                return v
×
1470
        case *vpcService:
×
1471
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1472
        case *AdminNetworkPolicyChangedDelta:
×
1473
                return v.key
×
1474
        case *SlrInfo:
×
1475
                return v.Name
×
1476
        default:
×
1477
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1478
                if err != nil {
×
1479
                        utilruntime.HandleError(err)
×
1480
                        return ""
×
1481
                }
×
1482
                return key
×
1483
        }
1484
}
1485

1486
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1487
        return func() {
×
1488
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1489
                }
×
1490
        }
1491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc