• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 16158575117

09 Jul 2025 02:04AM UTC coverage: 21.396% (-0.1%) from 21.536%
16158575117

push

github

web-flow
Use cert-manager to issue certificates for IPSec (#5365)

* Add support for issuing IPSec tunnel certificates using cert-manager.

When cert-manager certificates are enabled, the controller no longer generates the IPSec CA cert or private key stored in the `ovn-ipsec-ca` secret. The secret should be populated with the same CA as configured with cert-manager. It still enables IPSec in OVN NB.

When cert-manager certificates are enabled the CNI daemon creates cert-manager CertificateRequest resources instead of CSRs. A cert-manager ClusterIssuer should be configured to approve and sign these CertificateRequests with a matching CA as configured in `ovn-ipsec-ca` secret. The name of the issuer to use is configurable in the CNI.

The CNI daemon now watches the `ovn-ipsec-ca` secret for changes allowing for rollout of a new trust bundle. It verifies the currently configured certificate is signed by the new bundle and if not then triggers a new certificate to be issued. The daemon now splits each certificate in the CA bundle into a separate file as strongswan is unable to parse multiple CAs from a single file.

The CNI daemon now requests a new certificate when the current certificate is at least half way to expiry based on the times in the certificate. When generating a new certificate the daemon also generates a new key just in case the previous one was leaked somehow. The certificate lifetime is also now configurable rather than lasting for a year. The CNI no longer restarts the ipsec or ovs-ipsec-monitor services when the certificate changes and just requests ipsec to reread the CA certs if they change.

To allow for the CNI daemon to keep track of the versions of its key, certificate, and CA cert files it now stores them with locally unique names on disk. Keys and certs are suffixed with the timestamp they were generated. CA files are suffixed with the k8s revision number of the `ovn-ipsec-ca` secret.

The cert manager validation webhook (if used) shoul... (continued)

0 of 449 new or added lines in 5 files covered. (0.0%)

7 existing lines in 2 files now uncovered.

10515 of 49145 relevant lines covered (21.4%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.23
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "time"
9

10
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
11
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
12
        "github.com/puzpuzpuz/xsync/v4"
13
        "golang.org/x/time/rate"
14
        corev1 "k8s.io/api/core/v1"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
18
        "k8s.io/apimachinery/pkg/util/wait"
19
        kubeinformers "k8s.io/client-go/informers"
20
        "k8s.io/client-go/kubernetes/scheme"
21
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
22
        appsv1 "k8s.io/client-go/listers/apps/v1"
23
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
24
        v1 "k8s.io/client-go/listers/core/v1"
25
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
26
        netv1 "k8s.io/client-go/listers/networking/v1"
27
        "k8s.io/client-go/tools/cache"
28
        "k8s.io/client-go/tools/record"
29
        "k8s.io/client-go/util/workqueue"
30
        "k8s.io/klog/v2"
31
        "k8s.io/utils/keymutex"
32
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
33
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
34
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
35

36
        "github.com/kubeovn/kube-ovn/pkg/informer"
37

38
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
39
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
40
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
41
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
42
        "github.com/kubeovn/kube-ovn/pkg/ovs"
43
        "github.com/kubeovn/kube-ovn/pkg/util"
44
)
45

46
const controllerAgentName = "kube-ovn-controller"
47

48
const (
49
        logicalSwitchKey              = "ls"
50
        logicalRouterKey              = "lr"
51
        portGroupKey                  = "pg"
52
        networkPolicyKey              = "np"
53
        sgKey                         = "sg"
54
        associatedSgKeyPrefix         = "associated_sg_"
55
        sgsKey                        = "security_groups"
56
        u2oKey                        = "u2o"
57
        adminNetworkPolicyKey         = "anp"
58
        baselineAdminNetworkPolicyKey = "banp"
59
)
60

61
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
62
type Controller struct {
63
        config *Configuration
64

65
        ipam           *ovnipam.IPAM
66
        namedPort      *NamedPort
67
        anpPrioNameMap map[int32]string
68
        anpNamePrioMap map[string]int32
69

70
        OVNNbClient ovs.NbClient
71
        OVNSbClient ovs.SbClient
72

73
        // ExternalGatewayType define external gateway type, centralized
74
        ExternalGatewayType string
75

76
        podsLister             v1.PodLister
77
        podsSynced             cache.InformerSynced
78
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
79
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
80
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
81
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
82
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
83
        podKeyMutex            keymutex.KeyMutex
84

85
        vpcsLister           kubeovnlister.VpcLister
86
        vpcSynced            cache.InformerSynced
87
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
88
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
89
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
90
        vpcKeyMutex          keymutex.KeyMutex
91

92
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
93
        vpcNatGatewaySynced           cache.InformerSynced
94
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
95
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
96
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
97
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
98
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
99
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
100
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
101
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
102
        vpcNatGwKeyMutex              keymutex.KeyMutex
103

104
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
105
        vpcEgressGatewaySynced           cache.InformerSynced
106
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
107
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
108
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
109

110
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
111
        switchLBRuleSynced      cache.InformerSynced
112
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
113
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
114
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
115

116
        vpcDNSLister           kubeovnlister.VpcDnsLister
117
        vpcDNSSynced           cache.InformerSynced
118
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
119
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
120

121
        subnetsLister           kubeovnlister.SubnetLister
122
        subnetSynced            cache.InformerSynced
123
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
124
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
125
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
126
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
127
        subnetKeyMutex          keymutex.KeyMutex
128

129
        ippoolLister            kubeovnlister.IPPoolLister
130
        ippoolSynced            cache.InformerSynced
131
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
132
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
133
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
134
        ippoolKeyMutex          keymutex.KeyMutex
135

136
        ipsLister     kubeovnlister.IPLister
137
        ipSynced      cache.InformerSynced
138
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
139
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
140
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
141

142
        virtualIpsLister          kubeovnlister.VipLister
143
        virtualIpsSynced          cache.InformerSynced
144
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
145
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
146
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
147
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
148

149
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
150
        iptablesEipSynced      cache.InformerSynced
151
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
152
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
153
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
154
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
155

156
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
157
        iptablesFipSynced      cache.InformerSynced
158
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
159
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
160
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
161

162
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
163
        iptablesDnatRuleSynced      cache.InformerSynced
164
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
165
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
166
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
167

168
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
169
        iptablesSnatRuleSynced      cache.InformerSynced
170
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
171
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
172
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
173

174
        ovnEipsLister     kubeovnlister.OvnEipLister
175
        ovnEipSynced      cache.InformerSynced
176
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
177
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
178
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
179
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
180

181
        ovnFipsLister     kubeovnlister.OvnFipLister
182
        ovnFipSynced      cache.InformerSynced
183
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
184
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
185
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
186

187
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
188
        ovnSnatRuleSynced      cache.InformerSynced
189
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
190
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
191
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192

193
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
194
        ovnDnatRuleSynced      cache.InformerSynced
195
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
196
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
197
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
198

199
        providerNetworksLister kubeovnlister.ProviderNetworkLister
200
        providerNetworkSynced  cache.InformerSynced
201

202
        vlansLister     kubeovnlister.VlanLister
203
        vlanSynced      cache.InformerSynced
204
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
205
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
206
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
207
        vlanKeyMutex    keymutex.KeyMutex
208

209
        namespacesLister  v1.NamespaceLister
210
        namespacesSynced  cache.InformerSynced
211
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
212
        nsKeyMutex        keymutex.KeyMutex
213

214
        nodesLister     v1.NodeLister
215
        nodesSynced     cache.InformerSynced
216
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
217
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
218
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
219
        nodeKeyMutex    keymutex.KeyMutex
220

221
        servicesLister     v1.ServiceLister
222
        serviceSynced      cache.InformerSynced
223
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
224
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
225
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
226
        svcKeyMutex        keymutex.KeyMutex
227

228
        endpointSlicesLister          discoveryv1.EndpointSliceLister
229
        endpointSlicesSynced          cache.InformerSynced
230
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
231
        epKeyMutex                    keymutex.KeyMutex
232

233
        deploymentsLister appsv1.DeploymentLister
234
        deploymentsSynced cache.InformerSynced
235

236
        npsLister     netv1.NetworkPolicyLister
237
        npsSynced     cache.InformerSynced
238
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
239
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
240
        npKeyMutex    keymutex.KeyMutex
241

242
        sgsLister          kubeovnlister.SecurityGroupLister
243
        sgSynced           cache.InformerSynced
244
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
245
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
246
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
247
        sgKeyMutex         keymutex.KeyMutex
248

249
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
250
        qosPolicySynced      cache.InformerSynced
251
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
252
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
253
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
254

255
        configMapsLister v1.ConfigMapLister
256
        configMapsSynced cache.InformerSynced
257

258
        anpsLister     anplister.AdminNetworkPolicyLister
259
        anpsSynced     cache.InformerSynced
260
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
261
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
262
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
263
        anpKeyMutex    keymutex.KeyMutex
264

265
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
266
        banpsSynced     cache.InformerSynced
267
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
268
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
269
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
270
        banpKeyMutex    keymutex.KeyMutex
271

272
        csrLister           certListerv1.CertificateSigningRequestLister
273
        csrSynced           cache.InformerSynced
274
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
275

276
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
277
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
278
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
279

280
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
281
        netAttachSynced          cache.InformerSynced
282
        netAttachInformerFactory netAttach.SharedInformerFactory
283

284
        recorder               record.EventRecorder
285
        informerFactory        kubeinformers.SharedInformerFactory
286
        cmInformerFactory      kubeinformers.SharedInformerFactory
287
        deployInformerFactory  kubeinformers.SharedInformerFactory
288
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
289
        anpInformerFactory     anpinformer.SharedInformerFactory
290

291
        // Database health check
292
        dbFailureCount int
293
}
294

295
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
296
        if rateLimiter == nil {
2✔
297
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
298
        }
1✔
299
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
300
}
301

302
// Run creates and runs a new ovn controller
303
func Run(ctx context.Context, config *Configuration) {
×
304
        klog.V(4).Info("Creating event broadcaster")
×
305
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
306
        eventBroadcaster.StartLogging(klog.Infof)
×
307
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
308
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
309
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
310
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
311
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
312
        )
×
313

×
314
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
315
        if err != nil {
×
316
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
317
        }
×
318

319
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
320
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
321
                        listOption.AllowWatchBookmarks = true
×
322
                }))
×
323
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
324
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
325
                        listOption.AllowWatchBookmarks = true
×
326
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
327
        // deployment informer used to list/watch vpc egress gateway workloads
328
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
329
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
330
                        listOption.AllowWatchBookmarks = true
×
331
                        listOption.LabelSelector = selector.String()
×
332
                }))
×
333
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
334
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
335
                        listOption.AllowWatchBookmarks = true
×
336
                }))
×
337
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
338
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
339
                        listOption.AllowWatchBookmarks = true
×
340
                }))
×
341

342
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
343
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
344
                        listOption.AllowWatchBookmarks = true
×
345
                }),
×
346
        )
347

348
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
349

×
350
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
351
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
352
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
353
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
354
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
355
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
356
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
357
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
358
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
359
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
360
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
361
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
362
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
363
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
364
        podInformer := informerFactory.Core().V1().Pods()
×
365
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
366
        nodeInformer := informerFactory.Core().V1().Nodes()
×
367
        serviceInformer := informerFactory.Core().V1().Services()
×
368
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
369
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
370
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
371
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
372
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
373
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
374
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
375
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
376
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
377
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
378
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
379
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
380
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
381
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
382
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
383

×
384
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
385
        controller := &Controller{
×
386
                config:             config,
×
387
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
388
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
389
                ipam:               ovnipam.NewIPAM(),
×
390
                namedPort:          NewNamedPort(),
×
391

×
392
                vpcsLister:           vpcInformer.Lister(),
×
393
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
394
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
395
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
396
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
397
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
398

×
399
                vpcNatGatewayLister:           vpcNatGatewayInformer.Lister(),
×
400
                vpcNatGatewaySynced:           vpcNatGatewayInformer.Informer().HasSynced,
×
401
                addOrUpdateVpcNatGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
402
                initVpcNatGatewayQueue:        newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
403
                delVpcNatGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
404
                updateVpcEipQueue:             newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
405
                updateVpcFloatingIPQueue:      newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
406
                updateVpcDnatQueue:            newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
407
                updateVpcSnatQueue:            newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
408
                updateVpcSubnetQueue:          newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
409
                vpcNatGwKeyMutex:              keymutex.NewHashed(numKeyLocks),
×
410

×
411
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
412
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
413
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
414
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
415
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
416

×
417
                subnetsLister:           subnetInformer.Lister(),
×
418
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
419
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
420
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
421
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
422
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
423
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
424

×
425
                ippoolLister:            ippoolInformer.Lister(),
×
426
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
427
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
428
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
429
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
430
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
431

×
432
                ipsLister:     ipInformer.Lister(),
×
433
                ipSynced:      ipInformer.Informer().HasSynced,
×
434
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
435
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
436
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
437

×
438
                virtualIpsLister:          virtualIPInformer.Lister(),
×
439
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
440
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
441
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
442
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
443
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
444

×
445
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
446
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
447
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
448
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
449
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
450
                delIptablesEipQueue:    newTypedRateLimitingQueue("DeleteIptablesEip", custCrdRateLimiter),
×
451

×
452
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
453
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
454
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
455
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
456
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
457

×
458
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
459
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
460
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
461
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
462
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
463

×
464
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
465
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
466
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
467
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
468
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
469

×
470
                vlansLister:     vlanInformer.Lister(),
×
471
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
472
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
473
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
474
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
475
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
476

×
477
                providerNetworksLister: providerNetworkInformer.Lister(),
×
478
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
479

×
480
                podsLister:          podInformer.Lister(),
×
481
                podsSynced:          podInformer.Informer().HasSynced,
×
482
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
483
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
484
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
485
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
486
                                Name:          "DeletePod",
×
487
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
488
                        },
×
489
                ),
×
490
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
491
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
492

×
493
                namespacesLister:  namespaceInformer.Lister(),
×
494
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
495
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
496
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
497

×
498
                nodesLister:     nodeInformer.Lister(),
×
499
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
500
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
501
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
502
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
503
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
504

×
505
                servicesLister:     serviceInformer.Lister(),
×
506
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
507
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
508
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
509
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
510
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
511

×
512
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
513
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
514
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
515
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
516

×
517
                deploymentsLister: deploymentInformer.Lister(),
×
518
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
519

×
520
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
521
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
522
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
523
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
524
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
525

×
526
                configMapsLister: configMapInformer.Lister(),
×
527
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
528

×
529
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
530
                sgsLister:          sgInformer.Lister(),
×
531
                sgSynced:           sgInformer.Informer().HasSynced,
×
532
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
533
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
534
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
535

×
536
                ovnEipsLister:     ovnEipInformer.Lister(),
×
537
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
538
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
539
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
540
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
541
                delOvnEipQueue:    newTypedRateLimitingQueue("DeleteOvnEip", custCrdRateLimiter),
×
542

×
543
                ovnFipsLister:     ovnFipInformer.Lister(),
×
544
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
545
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
546
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
547
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
548

×
549
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
550
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
551
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
552
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
553
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
554

×
555
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
556
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
557
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
558
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
559
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
560

×
561
                csrLister:           csrInformer.Lister(),
×
562
                csrSynced:           csrInformer.Informer().HasSynced,
×
563
                addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),
×
564

×
565
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
566
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
567
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
568

×
569
                netAttachLister:          netAttachInformer.Lister(),
×
570
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
571
                netAttachInformerFactory: attachNetInformerFactory,
×
572

×
573
                recorder:               recorder,
×
574
                informerFactory:        informerFactory,
×
575
                cmInformerFactory:      cmInformerFactory,
×
576
                deployInformerFactory:  deployInformerFactory,
×
577
                kubeovnInformerFactory: kubeovnInformerFactory,
×
578
                anpInformerFactory:     anpInformerFactory,
×
579
        }
×
580

×
581
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
582
                config.OvnNbAddr,
×
583
                config.OvnTimeout,
×
584
                config.OvsDbConnectTimeout,
×
585
                config.OvsDbInactivityTimeout,
×
586
                config.OvsDbConnectMaxRetry,
×
587
        ); err != nil {
×
588
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
589
        }
×
590
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
591
                config.OvnSbAddr,
×
592
                config.OvnTimeout,
×
593
                config.OvsDbConnectTimeout,
×
594
                config.OvsDbInactivityTimeout,
×
595
                config.OvsDbConnectMaxRetry,
×
596
        ); err != nil {
×
597
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
598
        }
×
599
        if config.EnableLb {
×
600
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
601
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
602
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
603
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
604
                        "DeleteSwitchLBRule",
×
605
                        workqueue.NewTypedMaxOfRateLimiter(
×
606
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
607
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
608
                        ),
×
609
                )
×
610
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
611
                        "UpdateSwitchLBRule",
×
612
                        workqueue.NewTypedMaxOfRateLimiter(
×
613
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
614
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
615
                        ),
×
616
                )
×
617

×
618
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
619
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
620
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
621
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
622
        }
×
623

624
        if config.EnableNP {
×
625
                controller.npsLister = npInformer.Lister()
×
626
                controller.npsSynced = npInformer.Informer().HasSynced
×
627
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
628
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
629
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
630
        }
×
631

632
        if config.EnableANP {
×
633
                controller.anpsLister = anpInformer.Lister()
×
634
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
635
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
636
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
637
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
638
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
639

×
640
                controller.banpsLister = banpInformer.Lister()
×
641
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
642
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
643
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
644
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
645
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
646
        }
×
647

648
        defer controller.shutdown()
×
649
        klog.Info("Starting OVN controller")
×
650

×
651
        // Wait for the caches to be synced before starting workers
×
652
        controller.informerFactory.Start(ctx.Done())
×
653
        controller.cmInformerFactory.Start(ctx.Done())
×
654
        controller.deployInformerFactory.Start(ctx.Done())
×
655
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
656
        controller.anpInformerFactory.Start(ctx.Done())
×
657
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
658
        controller.StartNetAttachInformerFactory(ctx)
×
659

×
660
        klog.Info("Waiting for informer caches to sync")
×
661
        cacheSyncs := []cache.InformerSynced{
×
662
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
663
                controller.vpcSynced, controller.subnetSynced,
×
664
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
665
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
666
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
667
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
668
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
669
                controller.ovnDnatRuleSynced,
×
670
        }
×
671
        if controller.config.EnableLb {
×
672
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
673
        }
×
674
        if controller.config.EnableNP {
×
675
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
676
        }
×
677
        if controller.config.EnableANP {
×
678
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced)
×
679
        }
×
680

681
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
682
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
683
        }
×
684

685
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
686
                AddFunc:    controller.enqueueAddPod,
×
687
                DeleteFunc: controller.enqueueDeletePod,
×
688
                UpdateFunc: controller.enqueueUpdatePod,
×
689
        }); err != nil {
×
690
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
691
        }
×
692

693
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
694
                AddFunc:    controller.enqueueAddNamespace,
×
695
                UpdateFunc: controller.enqueueUpdateNamespace,
×
696
                DeleteFunc: controller.enqueueDeleteNamespace,
×
697
        }); err != nil {
×
698
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
699
        }
×
700

701
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
702
                AddFunc:    controller.enqueueAddNode,
×
703
                UpdateFunc: controller.enqueueUpdateNode,
×
704
                DeleteFunc: controller.enqueueDeleteNode,
×
705
        }); err != nil {
×
706
                util.LogFatalAndExit(err, "failed to add node event handler")
×
707
        }
×
708

709
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
710
                AddFunc:    controller.enqueueAddService,
×
711
                DeleteFunc: controller.enqueueDeleteService,
×
712
                UpdateFunc: controller.enqueueUpdateService,
×
713
        }); err != nil {
×
714
                util.LogFatalAndExit(err, "failed to add service event handler")
×
715
        }
×
716

717
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
718
                AddFunc:    controller.enqueueAddEndpointSlice,
×
719
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
720
        }); err != nil {
×
721
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
722
        }
×
723

724
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
725
                AddFunc:    controller.enqueueAddDeployment,
×
726
                UpdateFunc: controller.enqueueUpdateDeployment,
×
727
        }); err != nil {
×
728
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
729
        }
×
730

731
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
732
                AddFunc:    controller.enqueueAddVpc,
×
733
                UpdateFunc: controller.enqueueUpdateVpc,
×
734
                DeleteFunc: controller.enqueueDelVpc,
×
735
        }); err != nil {
×
736
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
737
        }
×
738

739
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
740
                AddFunc:    controller.enqueueAddVpcNatGw,
×
741
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
742
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
743
        }); err != nil {
×
744
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
745
        }
×
746

747
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
748
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
749
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
750
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
751
        }); err != nil {
×
752
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
753
        }
×
754

755
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
756
                AddFunc:    controller.enqueueAddSubnet,
×
757
                UpdateFunc: controller.enqueueUpdateSubnet,
×
758
                DeleteFunc: controller.enqueueDeleteSubnet,
×
759
        }); err != nil {
×
760
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
761
        }
×
762

763
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
764
                AddFunc:    controller.enqueueAddIPPool,
×
765
                UpdateFunc: controller.enqueueUpdateIPPool,
×
766
                DeleteFunc: controller.enqueueDeleteIPPool,
×
767
        }); err != nil {
×
768
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
769
        }
×
770

771
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
772
                AddFunc:    controller.enqueueAddIP,
×
773
                UpdateFunc: controller.enqueueUpdateIP,
×
774
                DeleteFunc: controller.enqueueDelIP,
×
775
        }); err != nil {
×
776
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
777
        }
×
778

779
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
780
                AddFunc:    controller.enqueueAddVlan,
×
781
                DeleteFunc: controller.enqueueDelVlan,
×
782
                UpdateFunc: controller.enqueueUpdateVlan,
×
783
        }); err != nil {
×
784
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
785
        }
×
786

787
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
788
                AddFunc:    controller.enqueueAddSg,
×
789
                DeleteFunc: controller.enqueueDeleteSg,
×
790
                UpdateFunc: controller.enqueueUpdateSg,
×
791
        }); err != nil {
×
792
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
793
        }
×
794

795
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
796
                AddFunc:    controller.enqueueAddVirtualIP,
×
797
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
798
                DeleteFunc: controller.enqueueDelVirtualIP,
×
799
        }); err != nil {
×
800
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
801
        }
×
802

803
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
804
                AddFunc:    controller.enqueueAddIptablesEip,
×
805
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
806
                DeleteFunc: controller.enqueueDelIptablesEip,
×
807
        }); err != nil {
×
808
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
809
        }
×
810

811
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
812
                AddFunc:    controller.enqueueAddIptablesFip,
×
813
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
814
                DeleteFunc: controller.enqueueDelIptablesFip,
×
815
        }); err != nil {
×
816
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
817
        }
×
818

819
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
820
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
821
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
822
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
823
        }); err != nil {
×
824
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
825
        }
×
826

827
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
828
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
829
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
830
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
831
        }); err != nil {
×
832
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
833
        }
×
834

835
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
836
                AddFunc:    controller.enqueueAddOvnEip,
×
837
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
838
                DeleteFunc: controller.enqueueDelOvnEip,
×
839
        }); err != nil {
×
840
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
841
        }
×
842

843
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
844
                AddFunc:    controller.enqueueAddOvnFip,
×
845
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
846
                DeleteFunc: controller.enqueueDelOvnFip,
×
847
        }); err != nil {
×
848
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
849
        }
×
850

851
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
852
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
853
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
854
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
855
        }); err != nil {
×
856
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
857
        }
×
858

859
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
860
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
861
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
862
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
863
        }); err != nil {
×
864
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
865
        }
×
866

867
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
868
                AddFunc:    controller.enqueueAddQoSPolicy,
×
869
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
870
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
871
        }); err != nil {
×
872
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
873
        }
×
874

875
        if config.EnableLb {
×
876
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
877
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
878
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
879
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
880
                }); err != nil {
×
881
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
882
                }
×
883

884
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
885
                        AddFunc:    controller.enqueueAddVpcDNS,
×
886
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
887
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
888
                }); err != nil {
×
889
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
890
                }
×
891
        }
892

893
        if config.EnableNP {
×
894
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
895
                        AddFunc:    controller.enqueueAddNp,
×
896
                        UpdateFunc: controller.enqueueUpdateNp,
×
897
                        DeleteFunc: controller.enqueueDeleteNp,
×
898
                }); err != nil {
×
899
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
900
                }
×
901
        }
902

903
        if config.EnableANP {
×
904
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                        AddFunc:    controller.enqueueAddAnp,
×
906
                        UpdateFunc: controller.enqueueUpdateAnp,
×
907
                        DeleteFunc: controller.enqueueDeleteAnp,
×
908
                }); err != nil {
×
909
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
910
                }
×
911

912
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
913
                        AddFunc:    controller.enqueueAddBanp,
×
914
                        UpdateFunc: controller.enqueueUpdateBanp,
×
915
                        DeleteFunc: controller.enqueueDeleteBanp,
×
916
                }); err != nil {
×
917
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
918
                }
×
919

920
                controller.anpPrioNameMap = make(map[int32]string, 100)
×
921
                controller.anpNamePrioMap = make(map[string]int32, 100)
×
922
        }
923

924
        if config.EnableOVNIPSec {
×
925
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
926
                        AddFunc:    controller.enqueueAddCsr,
×
927
                        UpdateFunc: controller.enqueueUpdateCsr,
×
928
                        // no need to add delete func for csr
×
929
                }); err != nil {
×
930
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
931
                }
×
932
        }
933

934
        controller.Run(ctx)
×
935
}
936

937
// Run will set up the event handlers for types we are interested in, as well
938
// as syncing informer caches and starting workers. It will block until stopCh
939
// is closed, at which point it will shutdown the workqueue and wait for
940
// workers to finish processing their current work items.
941
func (c *Controller) Run(ctx context.Context) {
×
942
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
943
        // Otherwise, the init process should be placed after all workers have already started working
×
944
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
945
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
946
        }
×
947

948
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
949
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
950
        }
×
951

952
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
953
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
954
        }
×
955

956
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
957
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
958
        }
×
959

960
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
961
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
962
        }
×
963

964
        if err := c.InitOVN(); err != nil {
×
965
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
966
        }
×
967

968
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
969
        if err := c.syncIPCR(); err != nil {
×
970
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
971
        }
×
972

973
        if err := c.syncFinalizers(); err != nil {
×
974
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
975
        }
×
976

977
        if err := c.InitIPAM(); err != nil {
×
978
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
979
        }
×
980

981
        if err := c.syncNodeRoutes(); err != nil {
×
982
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
983
        }
×
984

985
        if err := c.syncSubnetCR(); err != nil {
×
986
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
987
        }
×
988

989
        if err := c.syncVlanCR(); err != nil {
×
990
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
991
        }
×
992

NEW
993
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
994
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
995
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
996
                }
×
997
        }
998

999
        // start workers to do all the network operations
1000
        c.startWorkers(ctx)
×
1001

×
1002
        c.initResourceOnce()
×
1003
        <-ctx.Done()
×
1004
        klog.Info("Shutting down workers")
×
1005
}
1006

1007
func (c *Controller) dbStatus() {
×
1008
        const maxFailures = 5
×
1009

×
1010
        done := make(chan error, 2)
×
1011
        go func() {
×
1012
                done <- c.OVNNbClient.Echo(context.Background())
×
1013
        }()
×
1014
        go func() {
×
1015
                done <- c.OVNSbClient.Echo(context.Background())
×
1016
        }()
×
1017

1018
        resultsReceived := 0
×
1019
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1020

×
1021
        for resultsReceived < 2 {
×
1022
                select {
×
1023
                case err := <-done:
×
1024
                        resultsReceived++
×
1025
                        if err != nil {
×
1026
                                c.dbFailureCount++
×
1027
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1028
                                if c.dbFailureCount >= maxFailures {
×
1029
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1030
                                }
×
1031
                                return
×
1032
                        }
1033
                case <-timeout:
×
1034
                        c.dbFailureCount++
×
1035
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1036
                        if c.dbFailureCount >= maxFailures {
×
1037
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1038
                        }
×
1039
                        return
×
1040
                }
1041
        }
1042

1043
        if c.dbFailureCount > 0 {
×
1044
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1045
                c.dbFailureCount = 0
×
1046
        }
×
1047
}
1048

1049
func (c *Controller) shutdown() {
×
1050
        utilruntime.HandleCrash()
×
1051

×
1052
        c.addOrUpdatePodQueue.ShutDown()
×
1053
        c.deletePodQueue.ShutDown()
×
1054
        c.updatePodSecurityQueue.ShutDown()
×
1055

×
1056
        c.addNamespaceQueue.ShutDown()
×
1057

×
1058
        c.addOrUpdateSubnetQueue.ShutDown()
×
1059
        c.deleteSubnetQueue.ShutDown()
×
1060
        c.updateSubnetStatusQueue.ShutDown()
×
1061
        c.syncVirtualPortsQueue.ShutDown()
×
1062

×
1063
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1064
        c.updateIPPoolStatusQueue.ShutDown()
×
1065
        c.deleteIPPoolQueue.ShutDown()
×
1066

×
1067
        c.addNodeQueue.ShutDown()
×
1068
        c.updateNodeQueue.ShutDown()
×
1069
        c.deleteNodeQueue.ShutDown()
×
1070

×
1071
        c.addServiceQueue.ShutDown()
×
1072
        c.deleteServiceQueue.ShutDown()
×
1073
        c.updateServiceQueue.ShutDown()
×
1074
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1075

×
1076
        c.addVlanQueue.ShutDown()
×
1077
        c.delVlanQueue.ShutDown()
×
1078
        c.updateVlanQueue.ShutDown()
×
1079

×
1080
        c.addOrUpdateVpcQueue.ShutDown()
×
1081
        c.updateVpcStatusQueue.ShutDown()
×
1082
        c.delVpcQueue.ShutDown()
×
1083

×
1084
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1085
        c.initVpcNatGatewayQueue.ShutDown()
×
1086
        c.delVpcNatGatewayQueue.ShutDown()
×
1087
        c.updateVpcEipQueue.ShutDown()
×
1088
        c.updateVpcFloatingIPQueue.ShutDown()
×
1089
        c.updateVpcDnatQueue.ShutDown()
×
1090
        c.updateVpcSnatQueue.ShutDown()
×
1091
        c.updateVpcSubnetQueue.ShutDown()
×
1092

×
1093
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1094
        c.delVpcEgressGatewayQueue.ShutDown()
×
1095

×
1096
        if c.config.EnableLb {
×
1097
                c.addSwitchLBRuleQueue.ShutDown()
×
1098
                c.delSwitchLBRuleQueue.ShutDown()
×
1099
                c.updateSwitchLBRuleQueue.ShutDown()
×
1100

×
1101
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1102
                c.delVpcDNSQueue.ShutDown()
×
1103
        }
×
1104

1105
        c.addIPQueue.ShutDown()
×
1106
        c.updateIPQueue.ShutDown()
×
1107
        c.delIPQueue.ShutDown()
×
1108

×
1109
        c.addVirtualIPQueue.ShutDown()
×
1110
        c.updateVirtualIPQueue.ShutDown()
×
1111
        c.updateVirtualParentsQueue.ShutDown()
×
1112
        c.delVirtualIPQueue.ShutDown()
×
1113

×
1114
        c.addIptablesEipQueue.ShutDown()
×
1115
        c.updateIptablesEipQueue.ShutDown()
×
1116
        c.resetIptablesEipQueue.ShutDown()
×
1117
        c.delIptablesEipQueue.ShutDown()
×
1118

×
1119
        c.addIptablesFipQueue.ShutDown()
×
1120
        c.updateIptablesFipQueue.ShutDown()
×
1121
        c.delIptablesFipQueue.ShutDown()
×
1122

×
1123
        c.addIptablesDnatRuleQueue.ShutDown()
×
1124
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1125
        c.delIptablesDnatRuleQueue.ShutDown()
×
1126

×
1127
        c.addIptablesSnatRuleQueue.ShutDown()
×
1128
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1129
        c.delIptablesSnatRuleQueue.ShutDown()
×
1130

×
1131
        c.addQoSPolicyQueue.ShutDown()
×
1132
        c.updateQoSPolicyQueue.ShutDown()
×
1133
        c.delQoSPolicyQueue.ShutDown()
×
1134

×
1135
        c.addOvnEipQueue.ShutDown()
×
1136
        c.updateOvnEipQueue.ShutDown()
×
1137
        c.resetOvnEipQueue.ShutDown()
×
1138
        c.delOvnEipQueue.ShutDown()
×
1139

×
1140
        c.addOvnFipQueue.ShutDown()
×
1141
        c.updateOvnFipQueue.ShutDown()
×
1142
        c.delOvnFipQueue.ShutDown()
×
1143

×
1144
        c.addOvnSnatRuleQueue.ShutDown()
×
1145
        c.updateOvnSnatRuleQueue.ShutDown()
×
1146
        c.delOvnSnatRuleQueue.ShutDown()
×
1147

×
1148
        c.addOvnDnatRuleQueue.ShutDown()
×
1149
        c.updateOvnDnatRuleQueue.ShutDown()
×
1150
        c.delOvnDnatRuleQueue.ShutDown()
×
1151

×
1152
        if c.config.EnableNP {
×
1153
                c.updateNpQueue.ShutDown()
×
1154
                c.deleteNpQueue.ShutDown()
×
1155
        }
×
1156
        if c.config.EnableANP {
×
1157
                c.addAnpQueue.ShutDown()
×
1158
                c.updateAnpQueue.ShutDown()
×
1159
                c.deleteAnpQueue.ShutDown()
×
1160

×
1161
                c.addBanpQueue.ShutDown()
×
1162
                c.updateBanpQueue.ShutDown()
×
1163
                c.deleteBanpQueue.ShutDown()
×
1164
        }
×
1165

1166
        c.addOrUpdateSgQueue.ShutDown()
×
1167
        c.delSgQueue.ShutDown()
×
1168
        c.syncSgPortsQueue.ShutDown()
×
1169

×
1170
        c.addOrUpdateCsrQueue.ShutDown()
×
1171

×
1172
        if c.config.EnableLiveMigrationOptimize {
×
1173
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1174
        }
×
1175
}
1176

1177
func (c *Controller) startWorkers(ctx context.Context) {
×
1178
        klog.Info("Starting workers")
×
1179

×
1180
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1181
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1182
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1183

×
1184
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1185
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1186
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1187
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1188
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1189
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1190
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1191
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1192
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1193
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1194
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1195
        // add default and join subnet and wait them ready
×
1196
        go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1197
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1198
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1199
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1200
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1201
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1202
                klog.Infof("wait for subnets %v ready", subnets)
×
1203

×
1204
                return c.allSubnetReady(subnets...)
×
1205
        })
×
1206
        if err != nil {
×
1207
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1208
        }
×
1209

1210
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1211
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1212
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1213

×
1214
        // run node worker before handle any pods
×
1215
        for range c.config.WorkerNum {
×
1216
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1217
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1218
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1219
        }
×
1220
        for {
×
1221
                ready := true
×
1222
                time.Sleep(3 * time.Second)
×
1223
                nodes, err := c.nodesLister.List(labels.Everything())
×
1224
                if err != nil {
×
1225
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1226
                }
×
1227
                for _, node := range nodes {
×
1228
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1229
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1230
                                ready = false
×
1231
                                break
×
1232
                        }
1233
                }
1234
                if ready {
×
1235
                        break
×
1236
                }
1237
        }
1238

1239
        if c.config.EnableLb {
×
1240
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1241
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1242
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1243

×
1244
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1245
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1246
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1247

×
1248
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1249
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1250
                go wait.Until(func() {
×
1251
                        c.resyncVpcDNSConfig()
×
1252
                }, 5*time.Second, ctx.Done())
×
1253
        }
1254

1255
        for range c.config.WorkerNum {
×
1256
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1257
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1258
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1259

×
1260
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1261
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1262
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1263
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1264
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1265

×
1266
                if c.config.EnableLb {
×
1267
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1268
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1269
                }
×
1270

1271
                if c.config.EnableNP {
×
1272
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1273
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1274
                }
×
1275

1276
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1277
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1278
        }
1279

1280
        if c.config.EnableEipSnat {
×
1281
                go wait.Until(func() {
×
1282
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1283
                        c.resyncExternalGateway()
×
1284
                }, time.Second, ctx.Done())
×
1285

1286
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1287
                c.OVNNbClient.MonitorBFD()
×
1288
        }
1289
        // TODO: we should merge these two vpc nat config into one config and resync them together
1290
        go wait.Until(func() {
×
1291
                c.resyncVpcNatGwConfig()
×
1292
        }, time.Second, ctx.Done())
×
1293

1294
        go wait.Until(func() {
×
1295
                c.resyncVpcNatConfig()
×
1296
        }, time.Second, ctx.Done())
×
1297

1298
        if c.config.GCInterval != 0 {
×
1299
                go wait.Until(func() {
×
1300
                        if err := c.markAndCleanLSP(); err != nil {
×
1301
                                klog.Errorf("gc lsp error: %v", err)
×
1302
                        }
×
1303
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1304
        }
1305

1306
        go wait.Until(func() {
×
1307
                if err := c.inspectPod(); err != nil {
×
1308
                        klog.Errorf("inspection error: %v", err)
×
1309
                }
×
1310
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1311

1312
        if c.config.EnableExternalVpc {
×
1313
                go wait.Until(func() {
×
1314
                        c.syncExternalVpc()
×
1315
                }, 5*time.Second, ctx.Done())
×
1316
        }
1317

1318
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1319
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1320
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1321

×
1322
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1323
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1324
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1325
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1326

×
1327
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1330

×
1331
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1332
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1333
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1334

×
1335
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1336
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1337
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1338

×
1339
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1340

×
1341
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1342
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1343
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1344

×
1345
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1346
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1347
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1348
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1349

×
1350
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1351
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1352
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1353
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1354

×
1355
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1356
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1357
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1358

×
1359
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1360
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1361
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1362

×
1363
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1364
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1365
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1366

×
1367
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1368
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1369
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1370

×
1371
        if c.config.EnableANP {
×
1372
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1373
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1374
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1375

×
1376
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1377
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1378
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1379
        }
×
1380

1381
        if c.config.EnableLiveMigrationOptimize {
×
1382
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1383
        }
×
1384

1385
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1386

×
1387
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1388
}
1389

1390
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1391
        for _, lsName := range subnets {
2✔
1392
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1393
                if err != nil {
1✔
1394
                        klog.Error(err)
×
1395
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1396
                }
×
1397

1398
                if !exist {
2✔
1399
                        return false, nil
1✔
1400
                }
1✔
1401
        }
1402

1403
        return true, nil
1✔
1404
}
1405

1406
func (c *Controller) initResourceOnce() {
×
1407
        c.registerSubnetMetrics()
×
1408

×
1409
        if err := c.initNodeChassis(); err != nil {
×
1410
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1411
        }
×
1412

1413
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1414
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1415
        }
×
1416
        if err := c.syncSecurityGroup(); err != nil {
×
1417
                util.LogFatalAndExit(err, "failed to sync security group")
×
1418
        }
×
1419

1420
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1421
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1422
        }
×
1423

1424
        if err := c.initVpcNatGw(); err != nil {
×
1425
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1426
        }
×
1427
        if c.config.EnableLb {
×
1428
                if err := c.initVpcDNSConfig(); err != nil {
×
1429
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1430
                }
×
1431
        }
1432

1433
        // remove resources in ovndb that not exist any more in kubernetes resources
1434
        // process gc at last in case of affecting other init process
1435
        if err := c.gc(); err != nil {
×
1436
                util.LogFatalAndExit(err, "failed to run gc")
×
1437
        }
×
1438
}
1439

1440
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1441
        item, shutdown := queue.Get()
×
1442
        if shutdown {
×
1443
                return false
×
1444
        }
×
1445

1446
        err := func(item T) error {
×
1447
                defer queue.Done(item)
×
1448
                if err := handler(item); err != nil {
×
1449
                        queue.AddRateLimited(item)
×
1450
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1451
                }
×
1452
                queue.Forget(item)
×
1453
                return nil
×
1454
        }(item)
1455
        if err != nil {
×
1456
                utilruntime.HandleError(err)
×
1457
                return true
×
1458
        }
×
1459
        return true
×
1460
}
1461

1462
func getWorkItemKey(obj any) string {
×
1463
        switch v := obj.(type) {
×
1464
        case string:
×
1465
                return v
×
1466
        case *vpcService:
×
1467
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1468
        case *AdminNetworkPolicyChangedDelta:
×
1469
                return v.key
×
1470
        case *SlrInfo:
×
1471
                return v.Name
×
1472
        default:
×
1473
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1474
                if err != nil {
×
1475
                        utilruntime.HandleError(err)
×
1476
                        return ""
×
1477
                }
×
1478
                return key
×
1479
        }
1480
}
1481

1482
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1483
        return func() {
×
1484
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1485
                }
×
1486
        }
1487
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc