• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27260640629

10 Jun 2026 07:32AM UTC coverage: 25.942% (+0.08%) from 25.866%
27260640629

push

github

web-flow
fix(controller): correct EIP v6 label update in SNAT/DNAT status patch (#6846)

In patchOvnSnatStatus the label replace branch assigned the IPv4 EIP
value to the EipV6IpLabel, and in patchOvnDnatStatus the replace branch
did not update EipV6IpLabel at all, leaving it stale. Both diverged
from their add branches. Also trigger the replace branch when only the
v6 EIP differs, so IPv6-only EIP changes are not missed.

A wrong v6 label breaks the webhook's isOvnEipInUse check, which lists
NAT rules by matching both EipV4IpLabel and EipV6IpLabel, allowing the
deletion of an OvnEip that is still referenced.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>

0 of 4 new or added lines in 2 files covered. (0.0%)

171 existing lines in 2 files now uncovered.

15033 of 57948 relevant lines covered (25.94%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.12
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        kubeinformers "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        appsv1 "k8s.io/client-go/listers/apps/v1"
25
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
26
        v1 "k8s.io/client-go/listers/core/v1"
27
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
28
        netv1 "k8s.io/client-go/listers/networking/v1"
29
        "k8s.io/client-go/tools/cache"
30
        "k8s.io/client-go/tools/record"
31
        "k8s.io/client-go/util/workqueue"
32
        "k8s.io/klog/v2"
33
        "k8s.io/utils/keymutex"
34
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
35
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
36
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
37
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
38
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
39

40
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
41
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
42
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
43
        "github.com/kubeovn/kube-ovn/pkg/informer"
44
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
45
        "github.com/kubeovn/kube-ovn/pkg/ovs"
46
        "github.com/kubeovn/kube-ovn/pkg/util"
47
)
48

49
const controllerAgentName = "kube-ovn-controller"
50

51
const (
52
        logicalSwitchKey              = "ls"
53
        logicalRouterKey              = "lr"
54
        portGroupKey                  = "pg"
55
        networkPolicyKey              = "np"
56
        sgKey                         = "sg"
57
        sgsKey                        = "security_groups"
58
        u2oKey                        = "u2o"
59
        adminNetworkPolicyKey         = "anp"
60
        baselineAdminNetworkPolicyKey = "banp"
61
        ippoolKey                     = "ippool"
62
        clusterNetworkPolicyKey       = "cnp"
63
)
64

65
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
66
type Controller struct {
67
        config *Configuration
68

69
        ipam             *ovnipam.IPAM
70
        namedPort        *NamedPort
71
        anpPrioNameMap   map[int32]string
72
        anpNamePrioMap   map[string]int32
73
        bnpPrioNameMap   map[int32]string
74
        bnpNamePrioMap   map[string]int32
75
        priorityMapMutex sync.RWMutex
76

77
        OVNNbClient ovs.NbClient
78
        OVNSbClient ovs.SbClient
79

80
        // ExternalGatewayType define external gateway type, centralized
81
        ExternalGatewayType string
82

83
        podsLister             v1.PodLister
84
        podsSynced             cache.InformerSynced
85
        podIndexer             cache.Indexer
86
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
87
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
88
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
89
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
90
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
91
        podKeyMutex            keymutex.KeyMutex
92

93
        vpcsLister           kubeovnlister.VpcLister
94
        vpcSynced            cache.InformerSynced
95
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
96
        vpcLastPoliciesMap   *xsync.Map[string, string]
97
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
98
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
99
        vpcKeyMutex          keymutex.KeyMutex
100

101
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
102
        vpcNatGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
106
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
107
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
108
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
109
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
111
        vpcNatGwKeyMutex              keymutex.KeyMutex
112
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
113

114
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
115
        vpcEgressGatewaySynced           cache.InformerSynced
116
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
118
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
119

120
        // bgpConfLister/evpnConfLister are published asynchronously by the
121
        // optional-CRD background poller (StartBgpEvpnConfInformerFactory), but
122
        // read by VEG worker goroutines. Atomic pointers keep the read/write
123
        // race-free without locking the hot reconcile path.
124
        bgpConfLister  atomic.Pointer[kubeovnlister.BgpConfLister]
125
        bgpConfSynced  cache.InformerSynced
126
        evpnConfLister atomic.Pointer[kubeovnlister.EvpnConfLister]
127
        evpnConfSynced cache.InformerSynced
128

129
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
130
        switchLBRuleSynced      cache.InformerSynced
131
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
132
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
133
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
134

135
        vpcDNSLister           kubeovnlister.VpcDnsLister
136
        vpcDNSSynced           cache.InformerSynced
137
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
138
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
139

140
        subnetsLister           kubeovnlister.SubnetLister
141
        subnetSynced            cache.InformerSynced
142
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
143
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
144
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
145
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
146
        subnetKeyMutex          keymutex.KeyMutex
147

148
        ippoolLister            kubeovnlister.IPPoolLister
149
        ippoolSynced            cache.InformerSynced
150
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
151
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
152
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
153
        ippoolKeyMutex          keymutex.KeyMutex
154

155
        ipsLister     kubeovnlister.IPLister
156
        ipSynced      cache.InformerSynced
157
        ipIndexer     cache.Indexer
158
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
159
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
160
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
161

162
        virtualIpsLister          kubeovnlister.VipLister
163
        virtualIpsSynced          cache.InformerSynced
164
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
165
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
166
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
167
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
168

169
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
170
        iptablesEipSynced      cache.InformerSynced
171
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
172
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
173
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
174
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
175

176
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
177
        iptablesFipSynced      cache.InformerSynced
178
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
179
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
180
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
181

182
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
183
        iptablesDnatRuleSynced      cache.InformerSynced
184
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
185
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
186
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
187

188
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
189
        iptablesSnatRuleSynced      cache.InformerSynced
190
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
191
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
192
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
193

194
        ovnEipsLister     kubeovnlister.OvnEipLister
195
        ovnEipSynced      cache.InformerSynced
196
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
197
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
198
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
199
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
200

201
        ovnFipsLister     kubeovnlister.OvnFipLister
202
        ovnFipSynced      cache.InformerSynced
203
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
204
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
205
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
206

207
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
208
        ovnSnatRuleSynced      cache.InformerSynced
209
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
210
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
211
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
212

213
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
214
        ovnDnatRuleSynced      cache.InformerSynced
215
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
216
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
217
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
218

219
        providerNetworksLister kubeovnlister.ProviderNetworkLister
220
        providerNetworkSynced  cache.InformerSynced
221

222
        vlansLister     kubeovnlister.VlanLister
223
        vlanSynced      cache.InformerSynced
224
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
225
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
226
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
227
        vlanKeyMutex    keymutex.KeyMutex
228

229
        namespacesLister  v1.NamespaceLister
230
        namespacesSynced  cache.InformerSynced
231
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
232
        nsKeyMutex        keymutex.KeyMutex
233

234
        nodesLister     v1.NodeLister
235
        nodesSynced     cache.InformerSynced
236
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
237
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
238
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
239
        nodeKeyMutex    keymutex.KeyMutex
240

241
        servicesLister     v1.ServiceLister
242
        serviceSynced      cache.InformerSynced
243
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
244
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
245
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
246
        svcKeyMutex        keymutex.KeyMutex
247

248
        endpointSlicesLister          discoveryv1.EndpointSliceLister
249
        endpointSlicesSynced          cache.InformerSynced
250
        epsIndexer                    cache.Indexer
251
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
252
        epKeyMutex                    keymutex.KeyMutex
253

254
        deploymentsLister appsv1.DeploymentLister
255
        deploymentsSynced cache.InformerSynced
256

257
        npsLister     netv1.NetworkPolicyLister
258
        npsSynced     cache.InformerSynced
259
        npIndexer     cache.Indexer
260
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
261
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
262
        npKeyMutex    keymutex.KeyMutex
263

264
        sgsLister          kubeovnlister.SecurityGroupLister
265
        sgSynced           cache.InformerSynced
266
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
267
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
268
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
269
        sgKeyMutex         keymutex.KeyMutex
270

271
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
272
        qosPolicySynced      cache.InformerSynced
273
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
274
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
275
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
276

277
        configMapsLister v1.ConfigMapLister
278
        configMapsSynced cache.InformerSynced
279

280
        anpsLister     anplister.AdminNetworkPolicyLister
281
        anpsSynced     cache.InformerSynced
282
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
283
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
284
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
285
        anpKeyMutex    keymutex.KeyMutex
286

287
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
288
        dnsNameResolverIndexer          cache.Indexer
289
        dnsNameResolversSynced          cache.InformerSynced
290
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
291
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
292

293
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
294
        banpsSynced     cache.InformerSynced
295
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
296
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
297
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
298
        banpKeyMutex    keymutex.KeyMutex
299

300
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
301
        cnpsSynced     cache.InformerSynced
302
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
303
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
304
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
305
        cnpKeyMutex    keymutex.KeyMutex
306

307
        csrLister           certListerv1.CertificateSigningRequestLister
308
        csrSynced           cache.InformerSynced
309
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
310

311
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
312
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
313
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
314

315
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
316
        netAttachSynced          cache.InformerSynced
317
        netAttachInformerFactory netAttach.SharedInformerFactory
318

319
        serviceCIDRStore           *util.ServiceCIDRStore
320
        serviceCIDRLister          netv1.ServiceCIDRLister
321
        serviceCIDRSynced          cache.InformerSynced
322
        serviceCIDRInformerFactory kubeinformers.SharedInformerFactory
323

324
        recorder               record.EventRecorder
325
        informerFactory        kubeinformers.SharedInformerFactory
326
        cmInformerFactory      kubeinformers.SharedInformerFactory
327
        deployInformerFactory  kubeinformers.SharedInformerFactory
328
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
329
        anpInformerFactory     anpinformer.SharedInformerFactory
330

331
        // Database health check
332
        dbFailureCount int
333

334
        distributedSubnetNeedSync atomic.Bool
335
}
336

337
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
338
        if rateLimiter == nil {
2✔
339
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
340
        }
1✔
341
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
342
}
343

344
// Run creates and runs a new ovn controller
345
func Run(ctx context.Context, config *Configuration) {
×
346
        klog.V(4).Info("Creating event broadcaster")
×
347
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
348
        eventBroadcaster.StartLogging(klog.Infof)
×
349
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
350
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
351
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
352
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
353
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
354
        )
×
355

×
356
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
357
        if err != nil {
×
358
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
359
        }
×
360

361
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
362
                kubeinformers.WithTransform(util.TrimPodForController),
×
363
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
364
                        listOption.AllowWatchBookmarks = true
×
365
                }))
×
366
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
367
                kubeinformers.WithNamespace(config.PodNamespace),
×
368
                kubeinformers.WithTransform(util.TrimManagedFields),
×
369
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
370
                        listOption.AllowWatchBookmarks = true
×
371
                }))
×
372
        // deployment informer used to list/watch vpc egress gateway workloads
373
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
374
                kubeinformers.WithTransform(util.TrimManagedFields),
×
375
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
376
                        listOption.AllowWatchBookmarks = true
×
377
                        listOption.LabelSelector = selector.String()
×
378
                }))
×
379
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
380
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
381
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
382
                        listOption.AllowWatchBookmarks = true
×
383
                }))
×
384
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
385
                anpinformer.WithTransform(util.TrimManagedFields),
×
386
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
387
                        listOption.AllowWatchBookmarks = true
×
388
                }))
×
389
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
390
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
391
                        listOption.AllowWatchBookmarks = true
×
392
                }),
×
393
        )
394
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
395
                informer.WithTransform(util.TrimManagedFields),
×
396
        )
×
397
        // Dedicated factory so that on clusters without the ServiceCIDR API the
×
398
        // failed list/watch does not contaminate the main informer factory.
×
399
        serviceCIDRInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
400
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
401
                        listOption.AllowWatchBookmarks = true
×
402
                }),
×
403
        )
404

405
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
406
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
407
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
408
        // BgpConf/EvpnConf informers are started lazily via StartBgpEvpnConfInformerFactory
×
409
        // because their CRDs are optional on clusters that don't use vpc-egress-gateway BGP/EVPN.
×
410
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
411
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
412
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
413
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
414
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
415
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
416
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
417
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
418
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
419
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
420
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
421
        podInformer := informerFactory.Core().V1().Pods()
×
422
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
423
        nodeInformer := informerFactory.Core().V1().Nodes()
×
424
        serviceInformer := informerFactory.Core().V1().Services()
×
425
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
426
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
427
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
428
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
429
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
430
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
431
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
432
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
433
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
434
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
435
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
436
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
437
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
438
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
439
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
440
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
441
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
442

×
443
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
444
        controller := &Controller{
×
445
                config:             config,
×
446
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
447
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
448
                ipam:               ovnipam.NewIPAM(),
×
449
                namedPort:          NewNamedPort(),
×
450

×
451
                vpcsLister:           vpcInformer.Lister(),
×
452
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
453
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
454
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
455
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
456
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
457
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
458

×
459
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
460
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
461
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
462
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
463
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
464
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
465
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
466
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
467
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
468
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
469
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
470
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
471
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
472
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
473
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
474
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
475
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
476

×
477
                // bgpConfLister/bgpConfSynced/evpnConfLister/evpnConfSynced are populated lazily
×
478
                // in startBgpEvpnConfInformer once the matching CRDs are detected.
×
479

×
480
                subnetsLister:           subnetInformer.Lister(),
×
481
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
482
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
483
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
484
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
485
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
486
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
487

×
488
                ippoolLister:            ippoolInformer.Lister(),
×
489
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
490
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
491
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
492
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
493
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
494

×
495
                ipsLister:     ipInformer.Lister(),
×
496
                ipSynced:      ipInformer.Informer().HasSynced,
×
497
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
498
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
499
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
500

×
501
                virtualIpsLister:          virtualIPInformer.Lister(),
×
502
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
503
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
504
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
505
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
506
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
507

×
508
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
509
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
510
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
511
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
512
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
513
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
514

×
515
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
516
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
517
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
518
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
519
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
520

×
521
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
522
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
523
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
524
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
525
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
526

×
527
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
528
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
529
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
530
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
531
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
532

×
533
                vlansLister:     vlanInformer.Lister(),
×
534
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
535
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
536
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
537
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
538
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
539

×
540
                providerNetworksLister: providerNetworkInformer.Lister(),
×
541
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
542

×
543
                podsLister:          podInformer.Lister(),
×
544
                podsSynced:          podInformer.Informer().HasSynced,
×
545
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
546
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
547
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
548
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
549
                                Name:          "DeletePod",
×
550
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
551
                        },
×
552
                ),
×
553
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
554
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
555

×
556
                namespacesLister:  namespaceInformer.Lister(),
×
557
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
558
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
559
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
560

×
561
                nodesLister:     nodeInformer.Lister(),
×
562
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
563
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
564
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
565
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
566
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
567

×
568
                servicesLister:     serviceInformer.Lister(),
×
569
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
570
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
571
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
572
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
573
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
574

×
575
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
576
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
577
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
578
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
579

×
580
                deploymentsLister: deploymentInformer.Lister(),
×
581
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
582

×
583
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
584
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
585
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
586
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
587
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
588

×
589
                configMapsLister: configMapInformer.Lister(),
×
590
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
591

×
592
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
593
                sgsLister:          sgInformer.Lister(),
×
594
                sgSynced:           sgInformer.Informer().HasSynced,
×
595
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
596
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
597
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
598

×
599
                ovnEipsLister:     ovnEipInformer.Lister(),
×
600
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
601
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
602
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
603
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
604
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
605

×
606
                ovnFipsLister:     ovnFipInformer.Lister(),
×
607
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
608
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
609
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
610
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
611

×
612
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
613
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
614
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
615
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
616
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
617

×
618
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
619
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
620
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
621
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
622
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
623

×
624
                csrLister:           csrInformer.Lister(),
×
625
                csrSynced:           csrInformer.Informer().HasSynced,
×
626
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
627

×
628
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
629
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
630
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
631

×
632
                netAttachLister:          netAttachInformer.Lister(),
×
633
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
634
                netAttachInformerFactory: attachNetInformerFactory,
×
635

×
636
                serviceCIDRStore:           util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
637
                serviceCIDRInformerFactory: serviceCIDRInformerFactory,
×
638

×
639
                recorder:               recorder,
×
640
                informerFactory:        informerFactory,
×
641
                cmInformerFactory:      cmInformerFactory,
×
642
                deployInformerFactory:  deployInformerFactory,
×
643
                kubeovnInformerFactory: kubeovnInformerFactory,
×
644
                anpInformerFactory:     anpInformerFactory,
×
645
        }
×
646

×
647
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
648
                config.OvnNbAddr,
×
649
                config.OvnTimeout,
×
650
                config.OvsDbConnectTimeout,
×
651
                config.OvsDbInactivityTimeout,
×
652
                config.OvsDbConnectMaxRetry,
×
653
        ); err != nil {
×
654
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
655
        }
×
656
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
657
                config.OvnSbAddr,
×
658
                config.OvnTimeout,
×
659
                config.OvsDbConnectTimeout,
×
660
                config.OvsDbInactivityTimeout,
×
661
                config.OvsDbConnectMaxRetry,
×
662
        ); err != nil {
×
663
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
664
        }
×
665
        if config.EnableLb {
×
666
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
667
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
668
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
669
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
670
                        "DeleteSwitchLBRule",
×
671
                        workqueue.NewTypedMaxOfRateLimiter(
×
672
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
673
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
674
                        ),
×
675
                )
×
676
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
677
                        "UpdateSwitchLBRule",
×
678
                        workqueue.NewTypedMaxOfRateLimiter(
×
679
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
680
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
681
                        ),
×
682
                )
×
683

×
684
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
685
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
686
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
687
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
688
        }
×
689

690
        if config.EnableNP {
×
691
                controller.npsLister = npInformer.Lister()
×
692
                controller.npsSynced = npInformer.Informer().HasSynced
×
693
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
694
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
695
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
696
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
697
        }
×
698

699
        if config.EnableANP {
×
700
                controller.anpsLister = anpInformer.Lister()
×
701
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
702
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
703
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
704
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
705
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
706

×
707
                controller.banpsLister = banpInformer.Lister()
×
708
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
709
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
710
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
711
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
712
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
713

×
714
                controller.cnpsLister = cnpInformer.Lister()
×
715
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
716
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
717
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
718
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
719
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
720
        }
×
721

722
        if config.EnableDNSNameResolver {
×
723
                if !config.EnableANP {
×
724
                        klog.Warning("DNS name resolver is enabled but ANP support is disabled, DNSNameResolver resources will not take effect")
×
725
                }
×
726
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
727
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
728
                if err := dnsNameResolverInformer.Informer().AddIndexers(cache.Indexers{
×
729
                        IndexDNSNameResolverByName: indexDNSNameResolverByName,
×
730
                }); err != nil {
×
731
                        util.LogFatalAndExit(err, "failed to add DNSNameResolver indexer")
×
732
                }
×
UNCOV
733
                controller.dnsNameResolverIndexer = dnsNameResolverInformer.Informer().GetIndexer()
×
UNCOV
734
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
735
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
736
        }
737

UNCOV
738
        if err := controller.setupIndexers(podInformer.Informer(), endpointSliceInformer.Informer(), ipInformer.Informer()); err != nil {
×
739
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
740
        }
×
741

742
        defer controller.shutdown()
×
743
        klog.Info("Starting OVN controller")
×
744

×
745
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
746
        // NAD CRD is optional, so we check if it exists before starting the informer
×
747
        controller.StartNetAttachInformerFactory(ctx)
×
748

×
749
        // ServiceCIDR (networking.k8s.io/v1) is GA in K8s 1.33; older clusters
×
750
        // don't have the API at all. Best-effort start with periodic retry.
×
751
        controller.StartServiceCIDRInformerFactory(ctx)
×
752

×
753
        // BgpConf/EvpnConf are optional CRDs (v1.16.0+, used by vpc-egress-gateway BGP/EVPN).
×
754
        // They may be missing on clusters upgraded from <v1.16 via Helm, which does not
×
755
        // re-apply the `crds/` directory on `helm upgrade`. Best-effort start with periodic retry.
×
756
        controller.StartBgpEvpnConfInformerFactory(ctx)
×
757

×
758
        // Wait for the caches to be synced before starting workers
×
759
        controller.informerFactory.Start(ctx.Done())
×
760
        controller.cmInformerFactory.Start(ctx.Done())
×
761
        controller.deployInformerFactory.Start(ctx.Done())
×
762
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
763
        controller.anpInformerFactory.Start(ctx.Done())
×
764
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
765

×
766
        klog.Info("Waiting for informer caches to sync")
×
767
        cacheSyncs := []cache.InformerSynced{
×
768
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
769
                controller.vpcSynced, controller.subnetSynced,
×
770
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
771
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
772
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
773
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
774
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
775
                controller.ovnDnatRuleSynced,
×
776
        }
×
777
        if controller.config.EnableLb {
×
778
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
779
        }
×
780
        if controller.config.EnableNP {
×
781
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
782
        }
×
783
        if controller.config.EnableANP {
×
784
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
785
        }
×
UNCOV
786
        if controller.config.EnableDNSNameResolver {
×
787
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
788
        }
×
789

UNCOV
790
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
791
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
792
        }
×
793

794
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
795
                AddFunc:    controller.enqueueAddPod,
×
796
                DeleteFunc: controller.enqueueDeletePod,
×
797
                UpdateFunc: controller.enqueueUpdatePod,
×
UNCOV
798
        }); err != nil {
×
799
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
800
        }
×
801

802
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
803
                AddFunc:    controller.enqueueAddNamespace,
×
804
                UpdateFunc: controller.enqueueUpdateNamespace,
×
805
                DeleteFunc: controller.enqueueDeleteNamespace,
×
UNCOV
806
        }); err != nil {
×
807
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
808
        }
×
809

810
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
811
                AddFunc:    controller.enqueueAddNode,
×
812
                UpdateFunc: controller.enqueueUpdateNode,
×
813
                DeleteFunc: controller.enqueueDeleteNode,
×
UNCOV
814
        }); err != nil {
×
815
                util.LogFatalAndExit(err, "failed to add node event handler")
×
816
        }
×
817

818
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
819
                AddFunc:    controller.enqueueAddService,
×
820
                DeleteFunc: controller.enqueueDeleteService,
×
821
                UpdateFunc: controller.enqueueUpdateService,
×
UNCOV
822
        }); err != nil {
×
823
                util.LogFatalAndExit(err, "failed to add service event handler")
×
824
        }
×
825

826
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
827
                AddFunc:    controller.enqueueAddEndpointSlice,
×
828
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
UNCOV
829
        }); err != nil {
×
830
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
831
        }
×
832

833
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
834
                AddFunc:    controller.enqueueAddDeployment,
×
835
                UpdateFunc: controller.enqueueUpdateDeployment,
×
UNCOV
836
        }); err != nil {
×
837
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
838
        }
×
839

840
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
841
                AddFunc:    controller.enqueueAddVpc,
×
842
                UpdateFunc: controller.enqueueUpdateVpc,
×
843
                DeleteFunc: controller.enqueueDelVpc,
×
UNCOV
844
        }); err != nil {
×
845
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
846
        }
×
847

848
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
849
                AddFunc:    controller.enqueueAddVpcNatGw,
×
850
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
851
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
UNCOV
852
        }); err != nil {
×
853
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
854
        }
×
855

856
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
857
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
858
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
859
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
UNCOV
860
        }); err != nil {
×
861
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
862
        }
×
863

864
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
865
                AddFunc:    controller.enqueueAddSubnet,
×
866
                UpdateFunc: controller.enqueueUpdateSubnet,
×
867
                DeleteFunc: controller.enqueueDeleteSubnet,
×
UNCOV
868
        }); err != nil {
×
869
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
870
        }
×
871

872
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
873
                AddFunc:    controller.enqueueAddIPPool,
×
874
                UpdateFunc: controller.enqueueUpdateIPPool,
×
875
                DeleteFunc: controller.enqueueDeleteIPPool,
×
UNCOV
876
        }); err != nil {
×
877
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
878
        }
×
879

880
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                AddFunc:    controller.enqueueAddIP,
×
882
                UpdateFunc: controller.enqueueUpdateIP,
×
883
                DeleteFunc: controller.enqueueDelIP,
×
UNCOV
884
        }); err != nil {
×
885
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
886
        }
×
887

888
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
889
                AddFunc:    controller.enqueueAddVlan,
×
890
                DeleteFunc: controller.enqueueDelVlan,
×
891
                UpdateFunc: controller.enqueueUpdateVlan,
×
UNCOV
892
        }); err != nil {
×
893
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
894
        }
×
895

896
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
897
                AddFunc:    controller.enqueueAddSg,
×
898
                DeleteFunc: controller.enqueueDeleteSg,
×
899
                UpdateFunc: controller.enqueueUpdateSg,
×
UNCOV
900
        }); err != nil {
×
901
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
902
        }
×
903

904
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                AddFunc:    controller.enqueueAddVirtualIP,
×
906
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
907
                DeleteFunc: controller.enqueueDelVirtualIP,
×
UNCOV
908
        }); err != nil {
×
909
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
910
        }
×
911

912
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
913
                AddFunc:    controller.enqueueAddIptablesEip,
×
914
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
915
                DeleteFunc: controller.enqueueDelIptablesEip,
×
UNCOV
916
        }); err != nil {
×
917
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
918
        }
×
919

920
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
921
                AddFunc:    controller.enqueueAddIptablesFip,
×
922
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
923
                DeleteFunc: controller.enqueueDelIptablesFip,
×
UNCOV
924
        }); err != nil {
×
925
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
926
        }
×
927

928
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
929
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
930
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
931
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
UNCOV
932
        }); err != nil {
×
933
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
934
        }
×
935

936
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
937
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
938
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
939
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
UNCOV
940
        }); err != nil {
×
941
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
942
        }
×
943

944
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
945
                AddFunc:    controller.enqueueAddOvnEip,
×
946
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
947
                DeleteFunc: controller.enqueueDelOvnEip,
×
UNCOV
948
        }); err != nil {
×
949
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
950
        }
×
951

952
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
953
                AddFunc:    controller.enqueueAddOvnFip,
×
954
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
955
                DeleteFunc: controller.enqueueDelOvnFip,
×
UNCOV
956
        }); err != nil {
×
957
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
958
        }
×
959

960
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
961
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
962
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
963
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
UNCOV
964
        }); err != nil {
×
965
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
966
        }
×
967

968
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
969
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
970
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
971
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
UNCOV
972
        }); err != nil {
×
973
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
974
        }
×
975

976
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
977
                AddFunc:    controller.enqueueAddQoSPolicy,
×
978
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
979
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
UNCOV
980
        }); err != nil {
×
981
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
982
        }
×
983

984
        if config.EnableLb {
×
985
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
986
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
987
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
988
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
UNCOV
989
                }); err != nil {
×
990
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
991
                }
×
992

993
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
994
                        AddFunc:    controller.enqueueAddVpcDNS,
×
995
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
996
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
UNCOV
997
                }); err != nil {
×
UNCOV
998
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
999
                }
×
1000
        }
1001

1002
        if config.EnableNP {
×
1003
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1004
                        AddFunc:    controller.enqueueAddNp,
×
1005
                        UpdateFunc: controller.enqueueUpdateNp,
×
1006
                        DeleteFunc: controller.enqueueDeleteNp,
×
UNCOV
1007
                }); err != nil {
×
UNCOV
1008
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
1009
                }
×
1010
        }
1011

1012
        if config.EnableANP {
×
1013
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1014
                        AddFunc:    controller.enqueueAddAnp,
×
1015
                        UpdateFunc: controller.enqueueUpdateAnp,
×
1016
                        DeleteFunc: controller.enqueueDeleteAnp,
×
UNCOV
1017
                }); err != nil {
×
1018
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
1019
                }
×
1020

1021
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1022
                        AddFunc:    controller.enqueueAddBanp,
×
1023
                        UpdateFunc: controller.enqueueUpdateBanp,
×
1024
                        DeleteFunc: controller.enqueueDeleteBanp,
×
UNCOV
1025
                }); err != nil {
×
1026
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
1027
                }
×
1028

1029
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1030
                        AddFunc:    controller.enqueueAddCnp,
×
1031
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1032
                        DeleteFunc: controller.enqueueDeleteCnp,
×
UNCOV
1033
                }); err != nil {
×
1034
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1035
                }
×
1036

1037
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1038
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
UNCOV
1039
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
UNCOV
1040
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1041
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1042
        }
1043

1044
        if config.EnableDNSNameResolver {
×
1045
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1046
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1047
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1048
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
UNCOV
1049
                }); err != nil {
×
UNCOV
1050
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1051
                }
×
1052
        }
1053

1054
        if config.EnableOVNIPSec {
×
1055
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1056
                        AddFunc:    controller.enqueueAddCsr,
×
1057
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1058
                        // no need to add delete func for csr
×
UNCOV
1059
                }); err != nil {
×
UNCOV
1060
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1061
                }
×
1062
        }
1063

UNCOV
1064
        controller.Run(ctx)
×
1065
}
1066

1067
// Run will set up the event handlers for types we are interested in, as well
1068
// as syncing informer caches and starting workers. It will block until stopCh
1069
// is closed, at which point it will shutdown the workqueue and wait for
1070
// workers to finish processing their current work items.
1071
func (c *Controller) Run(ctx context.Context) {
×
1072
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1073
        // Otherwise, the init process should be placed after all workers have already started working
×
UNCOV
1074
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1075
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1076
        }
×
1077

UNCOV
1078
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1079
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1080
        }
×
1081

UNCOV
1082
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1083
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1084
        }
×
1085

UNCOV
1086
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1087
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1088
        }
×
1089

UNCOV
1090
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1091
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1092
        }
×
1093

UNCOV
1094
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1095
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1096
        }
×
1097

UNCOV
1098
        if err := c.InitOVN(); err != nil {
×
UNCOV
1099
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1100
        }
×
1101

1102
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
UNCOV
1103
        if err := c.syncIPCR(); err != nil {
×
1104
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1105
        }
×
1106

UNCOV
1107
        if err := c.syncFinalizers(); err != nil {
×
1108
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1109
        }
×
1110

UNCOV
1111
        if err := c.InitIPAM(); err != nil {
×
1112
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1113
        }
×
1114

UNCOV
1115
        if err := c.syncNodeRoutes(); err != nil {
×
1116
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1117
        }
×
1118

UNCOV
1119
        if err := c.syncSubnetCR(); err != nil {
×
1120
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1121
        }
×
1122

UNCOV
1123
        if err := c.syncVlanCR(); err != nil {
×
1124
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1125
        }
×
1126

1127
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
UNCOV
1128
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
UNCOV
1129
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
UNCOV
1130
                }
×
1131
        }
1132

1133
        // start workers to do all the network operations
1134
        c.startWorkers(ctx)
×
1135

×
1136
        c.initResourceOnce()
×
1137
        <-ctx.Done()
×
1138
        klog.Info("Shutting down workers")
×
UNCOV
1139

×
UNCOV
1140
        c.OVNNbClient.Close()
×
1141
        c.OVNSbClient.Close()
×
1142
}
1143

1144
func (c *Controller) dbStatus() {
×
1145
        const maxFailures = 5
×
1146

×
1147
        done := make(chan error, 2)
×
1148
        go func() {
×
1149
                done <- c.OVNNbClient.Echo(context.Background())
×
1150
        }()
×
UNCOV
1151
        go func() {
×
1152
                done <- c.OVNSbClient.Echo(context.Background())
×
1153
        }()
×
1154

1155
        resultsReceived := 0
×
1156
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1157

×
1158
        for resultsReceived < 2 {
×
1159
                select {
×
1160
                case err := <-done:
×
1161
                        resultsReceived++
×
1162
                        if err != nil {
×
1163
                                c.dbFailureCount++
×
1164
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1165
                                if c.dbFailureCount >= maxFailures {
×
UNCOV
1166
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1167
                                }
×
1168
                                return
×
1169
                        }
1170
                case <-timeout:
×
1171
                        c.dbFailureCount++
×
1172
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1173
                        if c.dbFailureCount >= maxFailures {
×
UNCOV
1174
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
UNCOV
1175
                        }
×
UNCOV
1176
                        return
×
1177
                }
1178
        }
1179

1180
        if c.dbFailureCount > 0 {
×
UNCOV
1181
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
UNCOV
1182
                c.dbFailureCount = 0
×
1183
        }
×
1184
}
1185

1186
func (c *Controller) shutdown() {
×
1187
        utilruntime.HandleCrash()
×
1188

×
1189
        c.addOrUpdatePodQueue.ShutDown()
×
1190
        c.deletePodQueue.ShutDown()
×
1191
        c.updatePodSecurityQueue.ShutDown()
×
1192

×
1193
        c.addNamespaceQueue.ShutDown()
×
1194

×
1195
        c.addOrUpdateSubnetQueue.ShutDown()
×
1196
        c.deleteSubnetQueue.ShutDown()
×
1197
        c.updateSubnetStatusQueue.ShutDown()
×
1198
        c.syncVirtualPortsQueue.ShutDown()
×
1199

×
1200
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1201
        c.updateIPPoolStatusQueue.ShutDown()
×
1202
        c.deleteIPPoolQueue.ShutDown()
×
1203

×
1204
        c.addNodeQueue.ShutDown()
×
1205
        c.updateNodeQueue.ShutDown()
×
1206
        c.deleteNodeQueue.ShutDown()
×
1207

×
1208
        c.addServiceQueue.ShutDown()
×
1209
        c.deleteServiceQueue.ShutDown()
×
1210
        c.updateServiceQueue.ShutDown()
×
1211
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1212

×
1213
        c.addVlanQueue.ShutDown()
×
1214
        c.delVlanQueue.ShutDown()
×
1215
        c.updateVlanQueue.ShutDown()
×
1216

×
1217
        c.addOrUpdateVpcQueue.ShutDown()
×
1218
        c.updateVpcStatusQueue.ShutDown()
×
1219
        c.delVpcQueue.ShutDown()
×
1220

×
1221
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1222
        c.initVpcNatGatewayQueue.ShutDown()
×
1223
        c.delVpcNatGatewayQueue.ShutDown()
×
1224
        c.updateVpcEipQueue.ShutDown()
×
1225
        c.updateVpcFloatingIPQueue.ShutDown()
×
1226
        c.updateVpcDnatQueue.ShutDown()
×
1227
        c.updateVpcSnatQueue.ShutDown()
×
1228
        c.updateVpcSubnetQueue.ShutDown()
×
1229

×
1230
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1231
        c.delVpcEgressGatewayQueue.ShutDown()
×
1232

×
1233
        if c.config.EnableLb {
×
1234
                c.addSwitchLBRuleQueue.ShutDown()
×
1235
                c.delSwitchLBRuleQueue.ShutDown()
×
1236
                c.updateSwitchLBRuleQueue.ShutDown()
×
1237

×
UNCOV
1238
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1239
                c.delVpcDNSQueue.ShutDown()
×
1240
        }
×
1241

1242
        c.addIPQueue.ShutDown()
×
1243
        c.updateIPQueue.ShutDown()
×
1244
        c.delIPQueue.ShutDown()
×
1245

×
1246
        c.addVirtualIPQueue.ShutDown()
×
1247
        c.updateVirtualIPQueue.ShutDown()
×
1248
        c.updateVirtualParentsQueue.ShutDown()
×
1249
        c.delVirtualIPQueue.ShutDown()
×
1250

×
1251
        c.addIptablesEipQueue.ShutDown()
×
1252
        c.updateIptablesEipQueue.ShutDown()
×
1253
        c.resetIptablesEipQueue.ShutDown()
×
1254
        c.delIptablesEipQueue.ShutDown()
×
1255

×
1256
        c.addIptablesFipQueue.ShutDown()
×
1257
        c.updateIptablesFipQueue.ShutDown()
×
1258
        c.delIptablesFipQueue.ShutDown()
×
1259

×
1260
        c.addIptablesDnatRuleQueue.ShutDown()
×
1261
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1262
        c.delIptablesDnatRuleQueue.ShutDown()
×
1263

×
1264
        c.addIptablesSnatRuleQueue.ShutDown()
×
1265
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1266
        c.delIptablesSnatRuleQueue.ShutDown()
×
1267

×
1268
        c.addQoSPolicyQueue.ShutDown()
×
1269
        c.updateQoSPolicyQueue.ShutDown()
×
1270
        c.delQoSPolicyQueue.ShutDown()
×
1271

×
1272
        c.addOvnEipQueue.ShutDown()
×
1273
        c.updateOvnEipQueue.ShutDown()
×
1274
        c.resetOvnEipQueue.ShutDown()
×
1275
        c.delOvnEipQueue.ShutDown()
×
1276

×
1277
        c.addOvnFipQueue.ShutDown()
×
1278
        c.updateOvnFipQueue.ShutDown()
×
1279
        c.delOvnFipQueue.ShutDown()
×
1280

×
1281
        c.addOvnSnatRuleQueue.ShutDown()
×
1282
        c.updateOvnSnatRuleQueue.ShutDown()
×
1283
        c.delOvnSnatRuleQueue.ShutDown()
×
1284

×
1285
        c.addOvnDnatRuleQueue.ShutDown()
×
1286
        c.updateOvnDnatRuleQueue.ShutDown()
×
1287
        c.delOvnDnatRuleQueue.ShutDown()
×
1288

×
1289
        if c.config.EnableNP {
×
1290
                c.updateNpQueue.ShutDown()
×
1291
                c.deleteNpQueue.ShutDown()
×
1292
        }
×
1293
        if c.config.EnableANP {
×
1294
                c.addAnpQueue.ShutDown()
×
1295
                c.updateAnpQueue.ShutDown()
×
1296
                c.deleteAnpQueue.ShutDown()
×
1297

×
1298
                c.addBanpQueue.ShutDown()
×
1299
                c.updateBanpQueue.ShutDown()
×
1300
                c.deleteBanpQueue.ShutDown()
×
1301

×
1302
                c.addCnpQueue.ShutDown()
×
UNCOV
1303
                c.updateCnpQueue.ShutDown()
×
1304
                c.deleteCnpQueue.ShutDown()
×
1305
        }
×
1306

1307
        if c.config.EnableDNSNameResolver {
×
UNCOV
1308
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1309
                c.deleteDNSNameResolverQueue.ShutDown()
×
1310
        }
×
1311

1312
        c.addOrUpdateSgQueue.ShutDown()
×
1313
        c.delSgQueue.ShutDown()
×
1314
        c.syncSgPortsQueue.ShutDown()
×
1315

×
1316
        c.addOrUpdateCsrQueue.ShutDown()
×
1317

×
UNCOV
1318
        if c.config.EnableLiveMigrationOptimize {
×
UNCOV
1319
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1320
        }
×
1321
}
1322

1323
func (c *Controller) startWorkers(ctx context.Context) {
×
1324
        klog.Info("Starting workers")
×
1325

×
1326
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1327
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1329

×
1330
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1331
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1332
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1333
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1334
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1335
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1336
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1337
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1338
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1339
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1340
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1341
        // add default and join subnet and wait them ready
×
1342
        for range c.config.WorkerNum {
×
1343
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1344
        }
×
1345
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1346
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1347
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1348
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1349
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1350
                klog.Infof("wait for subnets %v ready", subnets)
×
1351

×
1352
                return c.allSubnetReady(subnets...)
×
1353
        })
×
UNCOV
1354
        if err != nil {
×
1355
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1356
        }
×
1357

1358
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1359
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1360
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1361

×
1362
        // run node worker before handle any pods
×
1363
        for range c.config.WorkerNum {
×
1364
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1365
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1366
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1367
        }
×
1368
        for {
×
1369
                ready := true
×
1370
                time.Sleep(3 * time.Second)
×
1371
                nodes, err := c.nodesLister.List(labels.Everything())
×
1372
                if err != nil {
×
1373
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1374
                }
×
1375
                for _, node := range nodes {
×
1376
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
UNCOV
1377
                                klog.Infof("wait node %s annotation ready", node.Name)
×
UNCOV
1378
                                ready = false
×
1379
                                break
×
1380
                        }
1381
                }
UNCOV
1382
                if ready {
×
UNCOV
1383
                        break
×
1384
                }
1385
        }
1386

1387
        if c.config.EnableLb {
×
1388
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1389
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1390
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1391

×
1392
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1393
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1394
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1395

×
1396
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1397
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
UNCOV
1398
                go wait.Until(func() {
×
UNCOV
1399
                        c.resyncVpcDNSConfig()
×
1400
                }, 5*time.Second, ctx.Done())
×
1401
        }
1402

1403
        for range c.config.WorkerNum {
×
1404
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1405
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1406
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1407

×
1408
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1409
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1410
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1411
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1412
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1413

×
1414
                if c.config.EnableLb {
×
UNCOV
1415
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1416
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1417
                }
×
1418

1419
                if c.config.EnableNP {
×
UNCOV
1420
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1421
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1422
                }
×
1423

UNCOV
1424
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1425
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1426
        }
1427

1428
        if c.config.EnableEipSnat {
×
1429
                go wait.Until(func() {
×
UNCOV
1430
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
UNCOV
1431
                        c.resyncExternalGateway()
×
1432
                }, time.Second, ctx.Done())
×
1433

1434
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1435
                c.OVNNbClient.MonitorBFD()
×
1436
        }
1437
        // TODO: we should merge these two vpc nat config into one config and resync them together
UNCOV
1438
        go wait.Until(func() {
×
1439
                c.resyncVpcNatGwConfig()
×
1440
        }, time.Second, ctx.Done())
×
1441

UNCOV
1442
        go wait.Until(func() {
×
1443
                c.resyncVpcNatConfig()
×
1444
        }, time.Second, ctx.Done())
×
1445

1446
        if c.config.GCInterval != 0 {
×
1447
                go wait.Until(func() {
×
UNCOV
1448
                        if err := c.markAndCleanLSP(); err != nil {
×
UNCOV
1449
                                klog.Errorf("gc lsp error: %v", err)
×
UNCOV
1450
                        }
×
1451
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1452
        }
1453

1454
        go wait.Until(func() {
×
UNCOV
1455
                if err := c.inspectPod(); err != nil {
×
UNCOV
1456
                        klog.Errorf("inspection error: %v", err)
×
1457
                }
×
1458
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1459

1460
        if c.config.EnableExternalVpc {
×
UNCOV
1461
                go wait.Until(func() {
×
UNCOV
1462
                        c.syncExternalVpc()
×
1463
                }, 5*time.Second, ctx.Done())
×
1464
        }
1465

1466
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1467
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1468
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1469
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1470

×
1471
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1472
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1473
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1474
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1475

×
1476
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1477
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1478
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1479

×
1480
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1481
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1482
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1483

×
1484
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1485
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1486
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1487

×
1488
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1489

×
1490
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1491
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1492
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1493

×
1494
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1495
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1496
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1497
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1498

×
1499
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1500
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1501
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1502
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1503

×
1504
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1505
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1506
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1507

×
1508
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1509
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1510
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1511

×
1512
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1513
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1514
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1515

×
1516
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1517
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1518
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1519

×
1520
        if c.config.EnableANP {
×
1521
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1522
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1523
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1524

×
1525
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1526
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1527
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1528

×
1529
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
UNCOV
1530
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1531
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1532
        }
×
1533

1534
        if c.config.EnableDNSNameResolver {
×
UNCOV
1535
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1536
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1537
        }
×
1538

UNCOV
1539
        if c.config.EnableLiveMigrationOptimize {
×
1540
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1541
        }
×
1542

UNCOV
1543
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
UNCOV
1544

×
UNCOV
1545
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1546
}
1547

1548
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1549
        for _, lsName := range subnets {
2✔
1550
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1551
                if err != nil {
1✔
UNCOV
1552
                        klog.Error(err)
×
UNCOV
1553
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
UNCOV
1554
                }
×
1555

1556
                if !exist {
2✔
1557
                        return false, nil
1✔
1558
                }
1✔
1559
        }
1560

1561
        return true, nil
1✔
1562
}
1563

1564
func (c *Controller) initResourceOnce() {
×
1565
        c.registerSubnetMetrics()
×
1566

×
UNCOV
1567
        if err := c.initNodeChassis(); err != nil {
×
1568
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1569
        }
×
1570

1571
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1572
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1573
        }
×
UNCOV
1574
        if err := c.syncSecurityGroup(); err != nil {
×
1575
                util.LogFatalAndExit(err, "failed to sync security group")
×
1576
        }
×
1577

UNCOV
1578
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1579
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1580
        }
×
1581

1582
        if err := c.initVpcNatGw(); err != nil {
×
1583
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1584
        }
×
1585
        if c.config.EnableLb {
×
UNCOV
1586
                if err := c.initVpcDNSConfig(); err != nil {
×
UNCOV
1587
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
UNCOV
1588
                }
×
1589
        }
1590

1591
        // remove resources in ovndb that not exist any more in kubernetes resources
1592
        // process gc at last in case of affecting other init process
UNCOV
1593
        if err := c.gc(); err != nil {
×
UNCOV
1594
                util.LogFatalAndExit(err, "failed to run gc")
×
1595
        }
×
1596
}
1597

1598
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1599
        item, shutdown := queue.Get()
×
UNCOV
1600
        if shutdown {
×
1601
                return false
×
1602
        }
×
1603

1604
        err := func(item T) error {
×
1605
                defer queue.Done(item)
×
1606
                if err := handler(item); err != nil {
×
1607
                        queue.AddRateLimited(item)
×
1608
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
UNCOV
1609
                }
×
1610
                queue.Forget(item)
×
1611
                return nil
×
1612
        }(item)
1613
        if err != nil {
×
1614
                utilruntime.HandleError(err)
×
UNCOV
1615
                return true
×
UNCOV
1616
        }
×
1617
        return true
×
1618
}
1619

1620
func getWorkItemKey(obj any) string {
×
1621
        switch v := obj.(type) {
×
1622
        case string:
×
1623
                return v
×
1624
        case *vpcService:
×
1625
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1626
        case *AdminNetworkPolicyChangedDelta:
×
1627
                return v.key
×
1628
        case *SwitchLBRuleInfo:
×
1629
                return v.Name
×
1630
        default:
×
1631
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1632
                if err != nil {
×
1633
                        utilruntime.HandleError(err)
×
UNCOV
1634
                        return ""
×
UNCOV
1635
                }
×
UNCOV
1636
                return key
×
1637
        }
1638
}
1639

1640
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
UNCOV
1641
        return func() {
×
UNCOV
1642
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
UNCOV
1643
                }
×
1644
        }
1645
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc