• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27260616743

10 Jun 2026 07:31AM UTC coverage: 25.284% (+0.08%) from 25.208%
27260616743

push

github

oilbeater
fix(controller): avoid nil queue panic in DNS name resolver handlers (#6845)

When the controller starts with --enable-dns-name-resolver=true but
--enable-anp=false, updateAnpQueue and updateCnpQueue are never
constructed. Any DNSNameResolver CR carrying the "anp" label (e.g. left
over from a period when ANP was enabled) then drives
handleAddOrUpdateDNSNameResolver/handleDeleteDNSNameResolver into
calling Add on a nil workqueue, which panics the whole controller
process and forms a crash loop on every restart.

Skip the handlers when ANP support is disabled and warn at startup
about the ineffective configuration.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
(cherry picked from commit 4d9089020)

10 of 13 new or added lines in 2 files covered. (76.92%)

2 existing lines in 1 file now uncovered.

14355 of 56776 relevant lines covered (25.28%)

0.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.13
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/discovery"
23
        kubeinformers "k8s.io/client-go/informers"
24
        "k8s.io/client-go/kubernetes/scheme"
25
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
26
        appsv1 "k8s.io/client-go/listers/apps/v1"
27
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
28
        v1 "k8s.io/client-go/listers/core/v1"
29
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
30
        netv1 "k8s.io/client-go/listers/networking/v1"
31
        "k8s.io/client-go/tools/cache"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/util/workqueue"
34
        "k8s.io/klog/v2"
35
        "k8s.io/utils/keymutex"
36
        "k8s.io/utils/set"
37
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
38
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
39
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
40
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
41
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
42

43
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
44
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
45
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
46
        "github.com/kubeovn/kube-ovn/pkg/informer"
47
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
48
        "github.com/kubeovn/kube-ovn/pkg/ovs"
49
        "github.com/kubeovn/kube-ovn/pkg/util"
50
)
51

52
const controllerAgentName = "kube-ovn-controller"
53

54
const (
55
        logicalSwitchKey              = "ls"
56
        logicalRouterKey              = "lr"
57
        portGroupKey                  = "pg"
58
        networkPolicyKey              = "np"
59
        sgKey                         = "sg"
60
        sgsKey                        = "security_groups"
61
        u2oKey                        = "u2o"
62
        adminNetworkPolicyKey         = "anp"
63
        baselineAdminNetworkPolicyKey = "banp"
64
        ippoolKey                     = "ippool"
65
        clusterNetworkPolicyKey       = "cnp"
66
)
67

68
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
69
type Controller struct {
70
        config *Configuration
71

72
        ipam             *ovnipam.IPAM
73
        namedPort        *NamedPort
74
        anpPrioNameMap   map[int32]string
75
        anpNamePrioMap   map[string]int32
76
        bnpPrioNameMap   map[int32]string
77
        bnpNamePrioMap   map[string]int32
78
        priorityMapMutex sync.RWMutex
79

80
        OVNNbClient ovs.NbClient
81
        OVNSbClient ovs.SbClient
82

83
        // ExternalGatewayType define external gateway type, centralized
84
        ExternalGatewayType string
85

86
        podsLister             v1.PodLister
87
        podsSynced             cache.InformerSynced
88
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
89
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
90
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
91
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
92
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
93
        podKeyMutex            keymutex.KeyMutex
94

95
        vpcsLister           kubeovnlister.VpcLister
96
        vpcSynced            cache.InformerSynced
97
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
98
        vpcLastPoliciesMap   *xsync.Map[string, string]
99
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
100
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
101
        vpcKeyMutex          keymutex.KeyMutex
102

103
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
104
        vpcNatGatewaySynced           cache.InformerSynced
105
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
106
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
107
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
108
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
109
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
110
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
111
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
112
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
113
        vpcNatGwKeyMutex              keymutex.KeyMutex
114
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
115

116
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
117
        vpcEgressGatewaySynced           cache.InformerSynced
118
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
119
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
120
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
121

122
        // bgpConfLister/evpnConfLister are published asynchronously by the
123
        // optional-CRD background poller (StartBgpEvpnConfInformerFactory), but
124
        // read by VEG worker goroutines. Atomic pointers keep the read/write
125
        // race-free without locking the hot reconcile path.
126
        bgpConfLister  atomic.Pointer[kubeovnlister.BgpConfLister]
127
        bgpConfSynced  cache.InformerSynced
128
        evpnConfLister atomic.Pointer[kubeovnlister.EvpnConfLister]
129
        evpnConfSynced cache.InformerSynced
130

131
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
132
        switchLBRuleSynced      cache.InformerSynced
133
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
134
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
135
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
136

137
        vpcDNSLister           kubeovnlister.VpcDnsLister
138
        vpcDNSSynced           cache.InformerSynced
139
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
140
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
141

142
        subnetsLister           kubeovnlister.SubnetLister
143
        subnetSynced            cache.InformerSynced
144
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
145
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
146
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
147
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
148
        subnetKeyMutex          keymutex.KeyMutex
149

150
        ippoolLister            kubeovnlister.IPPoolLister
151
        ippoolSynced            cache.InformerSynced
152
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
153
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
154
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
155
        ippoolKeyMutex          keymutex.KeyMutex
156

157
        ipsLister     kubeovnlister.IPLister
158
        ipSynced      cache.InformerSynced
159
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
160
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
161
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
162

163
        virtualIpsLister          kubeovnlister.VipLister
164
        virtualIpsSynced          cache.InformerSynced
165
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
166
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
167
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
168
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
169

170
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
171
        iptablesEipSynced      cache.InformerSynced
172
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
173
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
174
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
175
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
176

177
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
178
        iptablesFipSynced      cache.InformerSynced
179
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
180
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
181
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
182

183
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
184
        iptablesDnatRuleSynced      cache.InformerSynced
185
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
186
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
187
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
188

189
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
190
        iptablesSnatRuleSynced      cache.InformerSynced
191
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
193
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
194

195
        ovnEipsLister     kubeovnlister.OvnEipLister
196
        ovnEipSynced      cache.InformerSynced
197
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
198
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
199
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
200
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
201

202
        ovnFipsLister     kubeovnlister.OvnFipLister
203
        ovnFipSynced      cache.InformerSynced
204
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
205
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
206
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
207

208
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
209
        ovnSnatRuleSynced      cache.InformerSynced
210
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
211
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
212
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
213

214
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
215
        ovnDnatRuleSynced      cache.InformerSynced
216
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
217
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
218
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
219

220
        providerNetworksLister kubeovnlister.ProviderNetworkLister
221
        providerNetworkSynced  cache.InformerSynced
222

223
        vlansLister     kubeovnlister.VlanLister
224
        vlanSynced      cache.InformerSynced
225
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
226
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
227
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
228
        vlanKeyMutex    keymutex.KeyMutex
229

230
        namespacesLister  v1.NamespaceLister
231
        namespacesSynced  cache.InformerSynced
232
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
233
        nsKeyMutex        keymutex.KeyMutex
234

235
        nodesLister     v1.NodeLister
236
        nodesSynced     cache.InformerSynced
237
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
238
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
239
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
240
        nodeKeyMutex    keymutex.KeyMutex
241

242
        servicesLister     v1.ServiceLister
243
        serviceSynced      cache.InformerSynced
244
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
245
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
246
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
247
        svcKeyMutex        keymutex.KeyMutex
248

249
        endpointSlicesLister          discoveryv1.EndpointSliceLister
250
        endpointSlicesSynced          cache.InformerSynced
251
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
252
        epKeyMutex                    keymutex.KeyMutex
253

254
        deploymentsLister appsv1.DeploymentLister
255
        deploymentsSynced cache.InformerSynced
256

257
        npsLister     netv1.NetworkPolicyLister
258
        npsSynced     cache.InformerSynced
259
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
260
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
261
        npKeyMutex    keymutex.KeyMutex
262

263
        sgsLister          kubeovnlister.SecurityGroupLister
264
        sgSynced           cache.InformerSynced
265
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
266
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
267
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
268
        sgKeyMutex         keymutex.KeyMutex
269

270
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
271
        qosPolicySynced      cache.InformerSynced
272
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
273
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
274
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
275

276
        configMapsLister v1.ConfigMapLister
277
        configMapsSynced cache.InformerSynced
278

279
        anpsLister     anplister.AdminNetworkPolicyLister
280
        anpsSynced     cache.InformerSynced
281
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
282
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
283
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
284
        anpKeyMutex    keymutex.KeyMutex
285

286
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
287
        dnsNameResolversSynced          cache.InformerSynced
288
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
289
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
290

291
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
292
        banpsSynced     cache.InformerSynced
293
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
294
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
295
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
296
        banpKeyMutex    keymutex.KeyMutex
297

298
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
299
        cnpsSynced     cache.InformerSynced
300
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
301
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
302
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
303
        cnpKeyMutex    keymutex.KeyMutex
304

305
        csrLister           certListerv1.CertificateSigningRequestLister
306
        csrSynced           cache.InformerSynced
307
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
308

309
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
310
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
311
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
312

313
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
314
        netAttachSynced          cache.InformerSynced
315
        netAttachInformerFactory netAttach.SharedInformerFactory
316

317
        recorder               record.EventRecorder
318
        informerFactory        kubeinformers.SharedInformerFactory
319
        cmInformerFactory      kubeinformers.SharedInformerFactory
320
        deployInformerFactory  kubeinformers.SharedInformerFactory
321
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
322
        anpInformerFactory     anpinformer.SharedInformerFactory
323

324
        // Database health check
325
        dbFailureCount int
326

327
        distributedSubnetNeedSync atomic.Bool
328
}
329

330
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
331
        if rateLimiter == nil {
2✔
332
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
333
        }
1✔
334
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
335
}
336

337
// Run creates and runs a new ovn controller
338
func Run(ctx context.Context, config *Configuration) {
×
339
        klog.V(4).Info("Creating event broadcaster")
×
340
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
341
        eventBroadcaster.StartLogging(klog.Infof)
×
342
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
343
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
344
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
345
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
346
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
347
        )
×
348

×
349
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
350
        if err != nil {
×
351
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
352
        }
×
353

354
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
355
                kubeinformers.WithTransform(util.TrimManagedFields),
×
356
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
357
                        listOption.AllowWatchBookmarks = true
×
358
                }))
×
359
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
360
                kubeinformers.WithNamespace(config.PodNamespace),
×
361
                kubeinformers.WithTransform(util.TrimManagedFields),
×
362
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
363
                        listOption.AllowWatchBookmarks = true
×
364
                }))
×
365
        // deployment informer used to list/watch vpc egress gateway workloads
366
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
367
                kubeinformers.WithTransform(util.TrimManagedFields),
×
368
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
369
                        listOption.AllowWatchBookmarks = true
×
370
                        listOption.LabelSelector = selector.String()
×
371
                }))
×
372
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
373
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
374
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
375
                        listOption.AllowWatchBookmarks = true
×
376
                }))
×
377
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
378
                anpinformer.WithTransform(util.TrimManagedFields),
×
379
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
380
                        listOption.AllowWatchBookmarks = true
×
381
                }))
×
382
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
383
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
384
                        listOption.AllowWatchBookmarks = true
×
385
                }),
×
386
        )
387
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
388
                informer.WithTransform(util.TrimManagedFields),
×
389
        )
×
390

×
391
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
392
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
393
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
394
        // BgpConf/EvpnConf informers are started lazily via StartBgpEvpnConfInformerFactory
×
395
        // because their CRDs are optional on clusters that don't use vpc-egress-gateway BGP/EVPN.
×
396
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
397
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
398
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
399
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
400
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
401
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
402
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
403
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
404
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
405
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
406
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
407
        podInformer := informerFactory.Core().V1().Pods()
×
408
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
409
        nodeInformer := informerFactory.Core().V1().Nodes()
×
410
        serviceInformer := informerFactory.Core().V1().Services()
×
411
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
412
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
413
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
414
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
415
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
416
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
417
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
418
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
419
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
420
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
421
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
422
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
423
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
424
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
425
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
426
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
427
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
428

×
429
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
430
        controller := &Controller{
×
431
                config:             config,
×
432
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
433
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
434
                ipam:               ovnipam.NewIPAM(),
×
435
                namedPort:          NewNamedPort(),
×
436

×
437
                vpcsLister:           vpcInformer.Lister(),
×
438
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
439
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
440
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
441
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
442
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
443
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
444

×
445
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
446
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
447
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
448
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
449
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
450
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
451
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
452
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
453
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
454
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
455
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
456
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
457
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
458
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
459
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
460
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
461
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
462

×
463
                // bgpConfLister/bgpConfSynced/evpnConfLister/evpnConfSynced are populated lazily
×
464
                // in startBgpEvpnConfInformer once the matching CRDs are detected.
×
465

×
466
                subnetsLister:           subnetInformer.Lister(),
×
467
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
468
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
469
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
470
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
471
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
472
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
473

×
474
                ippoolLister:            ippoolInformer.Lister(),
×
475
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
476
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
477
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
478
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
479
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
480

×
481
                ipsLister:     ipInformer.Lister(),
×
482
                ipSynced:      ipInformer.Informer().HasSynced,
×
483
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
484
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
485
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
486

×
487
                virtualIpsLister:          virtualIPInformer.Lister(),
×
488
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
489
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
490
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
491
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
492
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
493

×
494
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
495
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
496
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
497
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
498
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
499
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
500

×
501
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
502
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
503
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
504
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
505
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
506

×
507
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
508
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
509
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
510
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
511
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
512

×
513
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
514
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
515
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
516
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
517
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
518

×
519
                vlansLister:     vlanInformer.Lister(),
×
520
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
521
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
522
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
523
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
524
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
525

×
526
                providerNetworksLister: providerNetworkInformer.Lister(),
×
527
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
528

×
529
                podsLister:          podInformer.Lister(),
×
530
                podsSynced:          podInformer.Informer().HasSynced,
×
531
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
532
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
533
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
534
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
535
                                Name:          "DeletePod",
×
536
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
537
                        },
×
538
                ),
×
539
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
540
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
541

×
542
                namespacesLister:  namespaceInformer.Lister(),
×
543
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
544
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
545
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
546

×
547
                nodesLister:     nodeInformer.Lister(),
×
548
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
549
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
550
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
551
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
552
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
553

×
554
                servicesLister:     serviceInformer.Lister(),
×
555
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
556
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
557
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
558
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
559
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
560

×
561
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
562
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
563
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
564
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
565

×
566
                deploymentsLister: deploymentInformer.Lister(),
×
567
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
568

×
569
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
570
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
571
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
572
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
573
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
574

×
575
                configMapsLister: configMapInformer.Lister(),
×
576
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
577

×
578
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
579
                sgsLister:          sgInformer.Lister(),
×
580
                sgSynced:           sgInformer.Informer().HasSynced,
×
581
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
582
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
583
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
584

×
585
                ovnEipsLister:     ovnEipInformer.Lister(),
×
586
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
587
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
588
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
589
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
590
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
591

×
592
                ovnFipsLister:     ovnFipInformer.Lister(),
×
593
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
594
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
595
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
596
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
597

×
598
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
599
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
600
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
601
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
602
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
603

×
604
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
605
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
606
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
607
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
608
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
609

×
610
                csrLister:           csrInformer.Lister(),
×
611
                csrSynced:           csrInformer.Informer().HasSynced,
×
612
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
613

×
614
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
615
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
616
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
617

×
618
                netAttachLister:          netAttachInformer.Lister(),
×
619
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
620
                netAttachInformerFactory: attachNetInformerFactory,
×
621

×
622
                recorder:               recorder,
×
623
                informerFactory:        informerFactory,
×
624
                cmInformerFactory:      cmInformerFactory,
×
625
                deployInformerFactory:  deployInformerFactory,
×
626
                kubeovnInformerFactory: kubeovnInformerFactory,
×
627
                anpInformerFactory:     anpInformerFactory,
×
628
        }
×
629

×
630
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
631
                config.OvnNbAddr,
×
632
                config.OvnTimeout,
×
633
                config.OvsDbConnectTimeout,
×
634
                config.OvsDbInactivityTimeout,
×
635
                config.OvsDbConnectMaxRetry,
×
636
        ); err != nil {
×
637
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
638
        }
×
639
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
640
                config.OvnSbAddr,
×
641
                config.OvnTimeout,
×
642
                config.OvsDbConnectTimeout,
×
643
                config.OvsDbInactivityTimeout,
×
644
                config.OvsDbConnectMaxRetry,
×
645
        ); err != nil {
×
646
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
647
        }
×
648
        if config.EnableLb {
×
649
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
650
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
651
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
652
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
653
                        "DeleteSwitchLBRule",
×
654
                        workqueue.NewTypedMaxOfRateLimiter(
×
655
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
656
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
657
                        ),
×
658
                )
×
659
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
660
                        "UpdateSwitchLBRule",
×
661
                        workqueue.NewTypedMaxOfRateLimiter(
×
662
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
663
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
664
                        ),
×
665
                )
×
666

×
667
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
668
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
669
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
670
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
671
        }
×
672

673
        if config.EnableNP {
×
674
                controller.npsLister = npInformer.Lister()
×
675
                controller.npsSynced = npInformer.Informer().HasSynced
×
676
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
677
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
678
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
679
        }
×
680

681
        if config.EnableANP {
×
682
                controller.anpsLister = anpInformer.Lister()
×
683
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
684
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
685
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
686
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
687
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
688

×
689
                controller.banpsLister = banpInformer.Lister()
×
690
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
691
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
692
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
693
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
694
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
695

×
696
                controller.cnpsLister = cnpInformer.Lister()
×
697
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
698
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
699
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
700
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
701
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
702
        }
×
703

704
        if config.EnableDNSNameResolver {
×
NEW
705
                if !config.EnableANP {
×
NEW
706
                        klog.Warning("DNS name resolver is enabled but ANP support is disabled, DNSNameResolver resources will not take effect")
×
NEW
707
                }
×
708
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
709
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
710
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
711
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
712
        }
713

714
        defer controller.shutdown()
×
715
        klog.Info("Starting OVN controller")
×
716

×
717
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
718
        // NAD CRD is optional, so we check if it exists before starting the informer
×
719
        controller.StartNetAttachInformerFactory(ctx)
×
720

×
721
        // BgpConf/EvpnConf are optional CRDs (v1.16.0+, used by vpc-egress-gateway BGP/EVPN).
×
722
        // They may be missing on clusters upgraded from <v1.16 via Helm, which does not
×
723
        // re-apply the `crds/` directory on `helm upgrade`. Best-effort start with periodic retry.
×
724
        controller.StartBgpEvpnConfInformerFactory(ctx)
×
725

×
726
        // Wait for the caches to be synced before starting workers
×
727
        controller.informerFactory.Start(ctx.Done())
×
728
        controller.cmInformerFactory.Start(ctx.Done())
×
729
        controller.deployInformerFactory.Start(ctx.Done())
×
730
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
731
        controller.anpInformerFactory.Start(ctx.Done())
×
732
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
733

×
734
        klog.Info("Waiting for informer caches to sync")
×
735
        cacheSyncs := []cache.InformerSynced{
×
736
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
737
                controller.vpcSynced, controller.subnetSynced,
×
738
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
739
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
740
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
741
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
742
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
743
                controller.ovnDnatRuleSynced,
×
744
        }
×
745
        if controller.config.EnableLb {
×
746
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
747
        }
×
748
        if controller.config.EnableNP {
×
749
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
750
        }
×
751
        if controller.config.EnableANP {
×
752
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
753
        }
×
754
        if controller.config.EnableDNSNameResolver {
×
755
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
756
        }
×
757

758
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
759
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
760
        }
×
761

762
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
763
                AddFunc:    controller.enqueueAddPod,
×
764
                DeleteFunc: controller.enqueueDeletePod,
×
765
                UpdateFunc: controller.enqueueUpdatePod,
×
766
        }); err != nil {
×
767
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
768
        }
×
769

770
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
771
                AddFunc:    controller.enqueueAddNamespace,
×
772
                UpdateFunc: controller.enqueueUpdateNamespace,
×
773
                DeleteFunc: controller.enqueueDeleteNamespace,
×
774
        }); err != nil {
×
775
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
776
        }
×
777

778
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
779
                AddFunc:    controller.enqueueAddNode,
×
780
                UpdateFunc: controller.enqueueUpdateNode,
×
781
                DeleteFunc: controller.enqueueDeleteNode,
×
782
        }); err != nil {
×
783
                util.LogFatalAndExit(err, "failed to add node event handler")
×
784
        }
×
785

786
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
787
                AddFunc:    controller.enqueueAddService,
×
788
                DeleteFunc: controller.enqueueDeleteService,
×
789
                UpdateFunc: controller.enqueueUpdateService,
×
790
        }); err != nil {
×
791
                util.LogFatalAndExit(err, "failed to add service event handler")
×
792
        }
×
793

794
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
795
                AddFunc:    controller.enqueueAddEndpointSlice,
×
796
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
797
        }); err != nil {
×
798
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
799
        }
×
800

801
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
802
                AddFunc:    controller.enqueueAddDeployment,
×
803
                UpdateFunc: controller.enqueueUpdateDeployment,
×
804
        }); err != nil {
×
805
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
806
        }
×
807

808
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
809
                AddFunc:    controller.enqueueAddVpc,
×
810
                UpdateFunc: controller.enqueueUpdateVpc,
×
811
                DeleteFunc: controller.enqueueDelVpc,
×
812
        }); err != nil {
×
813
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
814
        }
×
815

816
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
817
                AddFunc:    controller.enqueueAddVpcNatGw,
×
818
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
819
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
820
        }); err != nil {
×
821
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
822
        }
×
823

824
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
825
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
826
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
827
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
828
        }); err != nil {
×
829
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
830
        }
×
831

832
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
833
                AddFunc:    controller.enqueueAddSubnet,
×
834
                UpdateFunc: controller.enqueueUpdateSubnet,
×
835
                DeleteFunc: controller.enqueueDeleteSubnet,
×
836
        }); err != nil {
×
837
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
838
        }
×
839

840
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
841
                AddFunc:    controller.enqueueAddIPPool,
×
842
                UpdateFunc: controller.enqueueUpdateIPPool,
×
843
                DeleteFunc: controller.enqueueDeleteIPPool,
×
844
        }); err != nil {
×
845
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
846
        }
×
847

848
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
849
                AddFunc:    controller.enqueueAddIP,
×
850
                UpdateFunc: controller.enqueueUpdateIP,
×
851
                DeleteFunc: controller.enqueueDelIP,
×
852
        }); err != nil {
×
853
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
854
        }
×
855

856
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
857
                AddFunc:    controller.enqueueAddVlan,
×
858
                DeleteFunc: controller.enqueueDelVlan,
×
859
                UpdateFunc: controller.enqueueUpdateVlan,
×
860
        }); err != nil {
×
861
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
862
        }
×
863

864
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
865
                AddFunc:    controller.enqueueAddSg,
×
866
                DeleteFunc: controller.enqueueDeleteSg,
×
867
                UpdateFunc: controller.enqueueUpdateSg,
×
868
        }); err != nil {
×
869
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
870
        }
×
871

872
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
873
                AddFunc:    controller.enqueueAddVirtualIP,
×
874
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
875
                DeleteFunc: controller.enqueueDelVirtualIP,
×
876
        }); err != nil {
×
877
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
878
        }
×
879

880
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
881
                AddFunc:    controller.enqueueAddIptablesEip,
×
882
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
883
                DeleteFunc: controller.enqueueDelIptablesEip,
×
884
        }); err != nil {
×
885
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
886
        }
×
887

888
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
889
                AddFunc:    controller.enqueueAddIptablesFip,
×
890
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
891
                DeleteFunc: controller.enqueueDelIptablesFip,
×
892
        }); err != nil {
×
893
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
894
        }
×
895

896
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
897
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
898
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
899
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
900
        }); err != nil {
×
901
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
902
        }
×
903

904
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
905
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
906
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
907
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
908
        }); err != nil {
×
909
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
910
        }
×
911

912
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
913
                AddFunc:    controller.enqueueAddOvnEip,
×
914
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
915
                DeleteFunc: controller.enqueueDelOvnEip,
×
916
        }); err != nil {
×
917
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
918
        }
×
919

920
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
921
                AddFunc:    controller.enqueueAddOvnFip,
×
922
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
923
                DeleteFunc: controller.enqueueDelOvnFip,
×
924
        }); err != nil {
×
925
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
926
        }
×
927

928
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
929
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
930
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
931
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
932
        }); err != nil {
×
933
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
934
        }
×
935

936
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
937
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
938
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
939
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
940
        }); err != nil {
×
941
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
942
        }
×
943

944
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
945
                AddFunc:    controller.enqueueAddQoSPolicy,
×
946
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
947
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
948
        }); err != nil {
×
949
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
950
        }
×
951

952
        if config.EnableLb {
×
953
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
954
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
955
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
956
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
957
                }); err != nil {
×
958
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
959
                }
×
960

961
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
962
                        AddFunc:    controller.enqueueAddVpcDNS,
×
963
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
964
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
965
                }); err != nil {
×
966
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
967
                }
×
968
        }
969

970
        if config.EnableNP {
×
971
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
972
                        AddFunc:    controller.enqueueAddNp,
×
973
                        UpdateFunc: controller.enqueueUpdateNp,
×
974
                        DeleteFunc: controller.enqueueDeleteNp,
×
975
                }); err != nil {
×
976
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
977
                }
×
978
        }
979

980
        if config.EnableANP {
×
981
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
982
                        AddFunc:    controller.enqueueAddAnp,
×
983
                        UpdateFunc: controller.enqueueUpdateAnp,
×
984
                        DeleteFunc: controller.enqueueDeleteAnp,
×
985
                }); err != nil {
×
986
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
987
                }
×
988

989
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
990
                        AddFunc:    controller.enqueueAddBanp,
×
991
                        UpdateFunc: controller.enqueueUpdateBanp,
×
992
                        DeleteFunc: controller.enqueueDeleteBanp,
×
993
                }); err != nil {
×
994
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
995
                }
×
996

997
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
998
                        AddFunc:    controller.enqueueAddCnp,
×
999
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1000
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1001
                }); err != nil {
×
1002
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1003
                }
×
1004

1005
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1006
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1007
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1008
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1009
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1010
        }
1011

1012
        if config.EnableDNSNameResolver {
×
1013
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1014
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1015
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1016
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1017
                }); err != nil {
×
1018
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1019
                }
×
1020
        }
1021

1022
        if config.EnableOVNIPSec {
×
1023
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1024
                        AddFunc:    controller.enqueueAddCsr,
×
1025
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1026
                        // no need to add delete func for csr
×
1027
                }); err != nil {
×
1028
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1029
                }
×
1030
        }
1031

1032
        controller.Run(ctx)
×
1033
}
1034

1035
// Run will set up the event handlers for types we are interested in, as well
1036
// as syncing informer caches and starting workers. It will block until stopCh
1037
// is closed, at which point it will shutdown the workqueue and wait for
1038
// workers to finish processing their current work items.
1039
func (c *Controller) Run(ctx context.Context) {
×
1040
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1041
        // Otherwise, the init process should be placed after all workers have already started working
×
1042
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1043
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1044
        }
×
1045

1046
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1047
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1048
        }
×
1049

1050
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1051
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1052
        }
×
1053

1054
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1055
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1056
        }
×
1057

1058
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1059
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1060
        }
×
1061

1062
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1063
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1064
        }
×
1065

1066
        if err := c.InitOVN(); err != nil {
×
1067
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1068
        }
×
1069

1070
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1071
        if err := c.syncIPCR(); err != nil {
×
1072
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1073
        }
×
1074

1075
        if err := c.syncFinalizers(); err != nil {
×
1076
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1077
        }
×
1078

1079
        if err := c.InitIPAM(); err != nil {
×
1080
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1081
        }
×
1082

1083
        if err := c.syncNodeRoutes(); err != nil {
×
1084
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1085
        }
×
1086

1087
        if err := c.syncSubnetCR(); err != nil {
×
1088
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1089
        }
×
1090

1091
        if err := c.syncVlanCR(); err != nil {
×
1092
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1093
        }
×
1094

1095
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1096
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1097
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1098
                }
×
1099
        }
1100

1101
        // start workers to do all the network operations
1102
        c.startWorkers(ctx)
×
1103

×
1104
        c.initResourceOnce()
×
1105
        <-ctx.Done()
×
1106
        klog.Info("Shutting down workers")
×
1107

×
1108
        c.OVNNbClient.Close()
×
1109
        c.OVNSbClient.Close()
×
1110
}
1111

1112
func (c *Controller) dbStatus() {
×
1113
        const maxFailures = 5
×
1114

×
1115
        done := make(chan error, 2)
×
1116
        go func() {
×
1117
                done <- c.OVNNbClient.Echo(context.Background())
×
1118
        }()
×
1119
        go func() {
×
1120
                done <- c.OVNSbClient.Echo(context.Background())
×
1121
        }()
×
1122

1123
        resultsReceived := 0
×
1124
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1125

×
1126
        for resultsReceived < 2 {
×
1127
                select {
×
1128
                case err := <-done:
×
1129
                        resultsReceived++
×
1130
                        if err != nil {
×
1131
                                c.dbFailureCount++
×
1132
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1133
                                if c.dbFailureCount >= maxFailures {
×
1134
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1135
                                }
×
1136
                                return
×
1137
                        }
1138
                case <-timeout:
×
1139
                        c.dbFailureCount++
×
1140
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1141
                        if c.dbFailureCount >= maxFailures {
×
1142
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1143
                        }
×
1144
                        return
×
1145
                }
1146
        }
1147

1148
        if c.dbFailureCount > 0 {
×
1149
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1150
                c.dbFailureCount = 0
×
1151
        }
×
1152
}
1153

1154
func (c *Controller) shutdown() {
×
1155
        utilruntime.HandleCrash()
×
1156

×
1157
        c.addOrUpdatePodQueue.ShutDown()
×
1158
        c.deletePodQueue.ShutDown()
×
1159
        c.updatePodSecurityQueue.ShutDown()
×
1160

×
1161
        c.addNamespaceQueue.ShutDown()
×
1162

×
1163
        c.addOrUpdateSubnetQueue.ShutDown()
×
1164
        c.deleteSubnetQueue.ShutDown()
×
1165
        c.updateSubnetStatusQueue.ShutDown()
×
1166
        c.syncVirtualPortsQueue.ShutDown()
×
1167

×
1168
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1169
        c.updateIPPoolStatusQueue.ShutDown()
×
1170
        c.deleteIPPoolQueue.ShutDown()
×
1171

×
1172
        c.addNodeQueue.ShutDown()
×
1173
        c.updateNodeQueue.ShutDown()
×
1174
        c.deleteNodeQueue.ShutDown()
×
1175

×
1176
        c.addServiceQueue.ShutDown()
×
1177
        c.deleteServiceQueue.ShutDown()
×
1178
        c.updateServiceQueue.ShutDown()
×
1179
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1180

×
1181
        c.addVlanQueue.ShutDown()
×
1182
        c.delVlanQueue.ShutDown()
×
1183
        c.updateVlanQueue.ShutDown()
×
1184

×
1185
        c.addOrUpdateVpcQueue.ShutDown()
×
1186
        c.updateVpcStatusQueue.ShutDown()
×
1187
        c.delVpcQueue.ShutDown()
×
1188

×
1189
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1190
        c.initVpcNatGatewayQueue.ShutDown()
×
1191
        c.delVpcNatGatewayQueue.ShutDown()
×
1192
        c.updateVpcEipQueue.ShutDown()
×
1193
        c.updateVpcFloatingIPQueue.ShutDown()
×
1194
        c.updateVpcDnatQueue.ShutDown()
×
1195
        c.updateVpcSnatQueue.ShutDown()
×
1196
        c.updateVpcSubnetQueue.ShutDown()
×
1197

×
1198
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1199
        c.delVpcEgressGatewayQueue.ShutDown()
×
1200

×
1201
        if c.config.EnableLb {
×
1202
                c.addSwitchLBRuleQueue.ShutDown()
×
1203
                c.delSwitchLBRuleQueue.ShutDown()
×
1204
                c.updateSwitchLBRuleQueue.ShutDown()
×
1205

×
1206
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1207
                c.delVpcDNSQueue.ShutDown()
×
1208
        }
×
1209

1210
        c.addIPQueue.ShutDown()
×
1211
        c.updateIPQueue.ShutDown()
×
1212
        c.delIPQueue.ShutDown()
×
1213

×
1214
        c.addVirtualIPQueue.ShutDown()
×
1215
        c.updateVirtualIPQueue.ShutDown()
×
1216
        c.updateVirtualParentsQueue.ShutDown()
×
1217
        c.delVirtualIPQueue.ShutDown()
×
1218

×
1219
        c.addIptablesEipQueue.ShutDown()
×
1220
        c.updateIptablesEipQueue.ShutDown()
×
1221
        c.resetIptablesEipQueue.ShutDown()
×
1222
        c.delIptablesEipQueue.ShutDown()
×
1223

×
1224
        c.addIptablesFipQueue.ShutDown()
×
1225
        c.updateIptablesFipQueue.ShutDown()
×
1226
        c.delIptablesFipQueue.ShutDown()
×
1227

×
1228
        c.addIptablesDnatRuleQueue.ShutDown()
×
1229
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1230
        c.delIptablesDnatRuleQueue.ShutDown()
×
1231

×
1232
        c.addIptablesSnatRuleQueue.ShutDown()
×
1233
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1234
        c.delIptablesSnatRuleQueue.ShutDown()
×
1235

×
1236
        c.addQoSPolicyQueue.ShutDown()
×
1237
        c.updateQoSPolicyQueue.ShutDown()
×
1238
        c.delQoSPolicyQueue.ShutDown()
×
1239

×
1240
        c.addOvnEipQueue.ShutDown()
×
1241
        c.updateOvnEipQueue.ShutDown()
×
1242
        c.resetOvnEipQueue.ShutDown()
×
1243
        c.delOvnEipQueue.ShutDown()
×
1244

×
1245
        c.addOvnFipQueue.ShutDown()
×
1246
        c.updateOvnFipQueue.ShutDown()
×
1247
        c.delOvnFipQueue.ShutDown()
×
1248

×
1249
        c.addOvnSnatRuleQueue.ShutDown()
×
1250
        c.updateOvnSnatRuleQueue.ShutDown()
×
1251
        c.delOvnSnatRuleQueue.ShutDown()
×
1252

×
1253
        c.addOvnDnatRuleQueue.ShutDown()
×
1254
        c.updateOvnDnatRuleQueue.ShutDown()
×
1255
        c.delOvnDnatRuleQueue.ShutDown()
×
1256

×
1257
        if c.config.EnableNP {
×
1258
                c.updateNpQueue.ShutDown()
×
1259
                c.deleteNpQueue.ShutDown()
×
1260
        }
×
1261
        if c.config.EnableANP {
×
1262
                c.addAnpQueue.ShutDown()
×
1263
                c.updateAnpQueue.ShutDown()
×
1264
                c.deleteAnpQueue.ShutDown()
×
1265

×
1266
                c.addBanpQueue.ShutDown()
×
1267
                c.updateBanpQueue.ShutDown()
×
1268
                c.deleteBanpQueue.ShutDown()
×
1269

×
1270
                c.addCnpQueue.ShutDown()
×
1271
                c.updateCnpQueue.ShutDown()
×
1272
                c.deleteCnpQueue.ShutDown()
×
1273
        }
×
1274

1275
        if c.config.EnableDNSNameResolver {
×
1276
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1277
                c.deleteDNSNameResolverQueue.ShutDown()
×
1278
        }
×
1279

1280
        c.addOrUpdateSgQueue.ShutDown()
×
1281
        c.delSgQueue.ShutDown()
×
1282
        c.syncSgPortsQueue.ShutDown()
×
1283

×
1284
        c.addOrUpdateCsrQueue.ShutDown()
×
1285

×
1286
        if c.config.EnableLiveMigrationOptimize {
×
1287
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1288
        }
×
1289
}
1290

1291
func (c *Controller) startWorkers(ctx context.Context) {
×
1292
        klog.Info("Starting workers")
×
1293

×
1294
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1295
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1296
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1297

×
1298
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1299
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1300
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1301
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1302
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1303
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1304
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1305
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1306
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1307
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1308
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1309
        // add default and join subnet and wait them ready
×
1310
        for range c.config.WorkerNum {
×
1311
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1312
        }
×
1313
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1314
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1315
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1316
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1317
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1318
                klog.Infof("wait for subnets %v ready", subnets)
×
1319

×
1320
                return c.allSubnetReady(subnets...)
×
1321
        })
×
1322
        if err != nil {
×
1323
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1324
        }
×
1325

1326
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1327
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1328
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1329

×
1330
        // run node worker before handle any pods
×
1331
        for range c.config.WorkerNum {
×
1332
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1333
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1334
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1335
        }
×
1336
        for {
×
1337
                ready := true
×
1338
                time.Sleep(3 * time.Second)
×
1339
                nodes, err := c.nodesLister.List(labels.Everything())
×
1340
                if err != nil {
×
1341
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1342
                }
×
1343
                for _, node := range nodes {
×
1344
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1345
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1346
                                ready = false
×
1347
                                break
×
1348
                        }
1349
                }
1350
                if ready {
×
1351
                        break
×
1352
                }
1353
        }
1354

1355
        if c.config.EnableLb {
×
1356
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1357
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1358
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1359

×
1360
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1361
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1362
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1363

×
1364
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1365
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1366
                go wait.Until(func() {
×
1367
                        c.resyncVpcDNSConfig()
×
1368
                }, 5*time.Second, ctx.Done())
×
1369
        }
1370

1371
        for range c.config.WorkerNum {
×
1372
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1373
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1374
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1375

×
1376
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1377
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1378
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1379
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1380
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1381

×
1382
                if c.config.EnableLb {
×
1383
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1384
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1385
                }
×
1386

1387
                if c.config.EnableNP {
×
1388
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1389
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1390
                }
×
1391

1392
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1393
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1394
        }
1395

1396
        if c.config.EnableEipSnat {
×
1397
                go wait.Until(func() {
×
1398
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1399
                        c.resyncExternalGateway()
×
1400
                }, time.Second, ctx.Done())
×
1401

1402
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1403
                c.OVNNbClient.MonitorBFD()
×
1404
        }
1405
        // TODO: we should merge these two vpc nat config into one config and resync them together
1406
        go wait.Until(func() {
×
1407
                c.resyncVpcNatGwConfig()
×
1408
        }, time.Second, ctx.Done())
×
1409

1410
        go wait.Until(func() {
×
1411
                c.resyncVpcNatConfig()
×
1412
        }, time.Second, ctx.Done())
×
1413

1414
        if c.config.GCInterval != 0 {
×
1415
                go wait.Until(func() {
×
1416
                        if err := c.markAndCleanLSP(); err != nil {
×
1417
                                klog.Errorf("gc lsp error: %v", err)
×
1418
                        }
×
1419
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1420
        }
1421

1422
        go wait.Until(func() {
×
1423
                if err := c.inspectPod(); err != nil {
×
1424
                        klog.Errorf("inspection error: %v", err)
×
1425
                }
×
1426
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1427

1428
        if c.config.EnableExternalVpc {
×
1429
                go wait.Until(func() {
×
1430
                        c.syncExternalVpc()
×
1431
                }, 5*time.Second, ctx.Done())
×
1432
        }
1433

1434
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1435
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1436
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1437
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1438

×
1439
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1441
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1442
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1443

×
1444
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1445
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1446
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1447

×
1448
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1449
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1450
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1451

×
1452
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1453
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1454
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1455

×
1456
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1457

×
1458
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1459
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1460
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1461

×
1462
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1463
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1464
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1465
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1466

×
1467
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1468
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1469
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1470
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1471

×
1472
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1473
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1474
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1475

×
1476
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1477
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1478
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1479

×
1480
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1481
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1482
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1483

×
1484
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1485
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1486
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1487

×
1488
        if c.config.EnableANP {
×
1489
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1490
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1491
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1492

×
1493
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1494
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1495
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1496

×
1497
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1498
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1499
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1500
        }
×
1501

1502
        if c.config.EnableDNSNameResolver {
×
1503
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1504
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1505
        }
×
1506

1507
        if c.config.EnableLiveMigrationOptimize {
×
1508
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1509
        }
×
1510

1511
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1512

×
1513
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1514
}
1515

1516
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1517
        for _, lsName := range subnets {
2✔
1518
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1519
                if err != nil {
1✔
1520
                        klog.Error(err)
×
1521
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1522
                }
×
1523

1524
                if !exist {
2✔
1525
                        return false, nil
1✔
1526
                }
1✔
1527
        }
1528

1529
        return true, nil
1✔
1530
}
1531

1532
func (c *Controller) initResourceOnce() {
×
1533
        c.registerSubnetMetrics()
×
1534

×
1535
        if err := c.initNodeChassis(); err != nil {
×
1536
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1537
        }
×
1538

1539
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1540
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1541
        }
×
1542
        if err := c.syncSecurityGroup(); err != nil {
×
1543
                util.LogFatalAndExit(err, "failed to sync security group")
×
1544
        }
×
1545

1546
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1547
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1548
        }
×
1549

1550
        if err := c.initVpcNatGw(); err != nil {
×
1551
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1552
        }
×
1553
        if c.config.EnableLb {
×
1554
                if err := c.initVpcDNSConfig(); err != nil {
×
1555
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1556
                }
×
1557
        }
1558

1559
        // remove resources in ovndb that not exist any more in kubernetes resources
1560
        // process gc at last in case of affecting other init process
1561
        if err := c.gc(); err != nil {
×
1562
                util.LogFatalAndExit(err, "failed to run gc")
×
1563
        }
×
1564
}
1565

1566
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1567
        item, shutdown := queue.Get()
×
1568
        if shutdown {
×
1569
                return false
×
1570
        }
×
1571

1572
        err := func(item T) error {
×
1573
                defer queue.Done(item)
×
1574
                if err := handler(item); err != nil {
×
1575
                        queue.AddRateLimited(item)
×
1576
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1577
                }
×
1578
                queue.Forget(item)
×
1579
                return nil
×
1580
        }(item)
1581
        if err != nil {
×
1582
                utilruntime.HandleError(err)
×
1583
                return true
×
1584
        }
×
1585
        return true
×
1586
}
1587

1588
func getWorkItemKey(obj any) string {
×
1589
        switch v := obj.(type) {
×
1590
        case string:
×
1591
                return v
×
1592
        case *vpcService:
×
1593
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1594
        case *AdminNetworkPolicyChangedDelta:
×
1595
                return v.key
×
1596
        case *SlrInfo:
×
1597
                return v.Name
×
1598
        default:
×
1599
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1600
                if err != nil {
×
1601
                        utilruntime.HandleError(err)
×
1602
                        return ""
×
1603
                }
×
1604
                return key
×
1605
        }
1606
}
1607

1608
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1609
        return func() {
×
1610
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1611
                }
×
1612
        }
1613
}
1614

1615
// apiResourceExists checks if all specified kinds exist in the given group version.
1616
// It returns true if all kinds are found, false otherwise.
1617
// Parameters:
1618
// - discoveryClient: The discovery client to use for querying API resources.
1619
// - gv: The group version string (e.g., "apps/v1").
1620
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
1621
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
1622
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
1623
        if err != nil {
×
1624
                if k8serrors.IsNotFound(err) {
×
1625
                        return false, nil
×
1626
                }
×
1627
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1628
        }
1629

1630
        existingKinds := set.New[string]()
×
1631
        for _, apiResource := range apiResourceLists.APIResources {
×
1632
                existingKinds.Insert(apiResource.Kind)
×
1633
        }
×
1634

1635
        return existingKinds.HasAll(kinds...), nil
×
1636
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc