• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27260652328

10 Jun 2026 07:32AM UTC coverage: 24.294% (+0.08%) from 24.215%
27260652328

push

github

oilbeater
fix(controller): avoid nil queue panic in DNS name resolver handlers (#6845)

When the controller starts with --enable-dns-name-resolver=true but
--enable-anp=false, updateAnpQueue and updateCnpQueue are never
constructed. Any DNSNameResolver CR carrying the "anp" label (e.g. left
over from a period when ANP was enabled) then drives
handleAddOrUpdateDNSNameResolver/handleDeleteDNSNameResolver into
calling Add on a nil workqueue, which panics the whole controller
process and forms a crash loop on every restart.

Skip the handlers when ANP support is disabled and warn at startup
about the ineffective configuration.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
(cherry picked from commit 4d9089020)

10 of 13 new or added lines in 2 files covered. (76.92%)

2 existing lines in 1 file now uncovered.

13339 of 54907 relevant lines covered (24.29%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.14
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/discovery"
23
        kubeinformers "k8s.io/client-go/informers"
24
        "k8s.io/client-go/kubernetes/scheme"
25
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
26
        appsv1 "k8s.io/client-go/listers/apps/v1"
27
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
28
        v1 "k8s.io/client-go/listers/core/v1"
29
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
30
        netv1 "k8s.io/client-go/listers/networking/v1"
31
        "k8s.io/client-go/tools/cache"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/util/workqueue"
34
        "k8s.io/klog/v2"
35
        "k8s.io/utils/keymutex"
36
        "k8s.io/utils/set"
37
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
38
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
39
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
40
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
41
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
42

43
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
44
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
45
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
46
        "github.com/kubeovn/kube-ovn/pkg/informer"
47
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
48
        "github.com/kubeovn/kube-ovn/pkg/ovs"
49
        "github.com/kubeovn/kube-ovn/pkg/util"
50
)
51

52
const controllerAgentName = "kube-ovn-controller"
53

54
const (
55
        logicalSwitchKey              = "ls"
56
        logicalRouterKey              = "lr"
57
        portGroupKey                  = "pg"
58
        networkPolicyKey              = "np"
59
        sgKey                         = "sg"
60
        sgsKey                        = "security_groups"
61
        u2oKey                        = "u2o"
62
        adminNetworkPolicyKey         = "anp"
63
        baselineAdminNetworkPolicyKey = "banp"
64
        ippoolKey                     = "ippool"
65
        clusterNetworkPolicyKey       = "cnp"
66
)
67

68
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
69
type Controller struct {
70
        config *Configuration
71

72
        ipam             *ovnipam.IPAM
73
        namedPort        *NamedPort
74
        anpPrioNameMap   map[int32]string
75
        anpNamePrioMap   map[string]int32
76
        bnpPrioNameMap   map[int32]string
77
        bnpNamePrioMap   map[string]int32
78
        priorityMapMutex sync.RWMutex
79

80
        OVNNbClient ovs.NbClient
81
        OVNSbClient ovs.SbClient
82

83
        // ExternalGatewayType define external gateway type, centralized
84
        ExternalGatewayType string
85

86
        podsLister             v1.PodLister
87
        podsSynced             cache.InformerSynced
88
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
89
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
90
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
91
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
92
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
93
        podKeyMutex            keymutex.KeyMutex
94

95
        vpcsLister           kubeovnlister.VpcLister
96
        vpcSynced            cache.InformerSynced
97
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
98
        vpcLastPoliciesMap   *xsync.Map[string, string]
99
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
100
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
101
        vpcKeyMutex          keymutex.KeyMutex
102

103
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
104
        vpcNatGatewaySynced           cache.InformerSynced
105
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
106
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
107
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
108
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
109
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
110
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
111
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
112
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
113
        vpcNatGwKeyMutex              keymutex.KeyMutex
114
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
115

116
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
117
        vpcEgressGatewaySynced           cache.InformerSynced
118
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
119
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
120
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
121

122
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
123
        switchLBRuleSynced      cache.InformerSynced
124
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
125
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
126
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
127

128
        vpcDNSLister           kubeovnlister.VpcDnsLister
129
        vpcDNSSynced           cache.InformerSynced
130
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
131
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
132

133
        subnetsLister           kubeovnlister.SubnetLister
134
        subnetSynced            cache.InformerSynced
135
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
136
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
137
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
138
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
139
        subnetKeyMutex          keymutex.KeyMutex
140

141
        ippoolLister            kubeovnlister.IPPoolLister
142
        ippoolSynced            cache.InformerSynced
143
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
144
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
145
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
146
        ippoolKeyMutex          keymutex.KeyMutex
147

148
        ipsLister     kubeovnlister.IPLister
149
        ipSynced      cache.InformerSynced
150
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
151
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
152
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
153

154
        virtualIpsLister          kubeovnlister.VipLister
155
        virtualIpsSynced          cache.InformerSynced
156
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
157
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
158
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
159
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
160

161
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
162
        iptablesEipSynced      cache.InformerSynced
163
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
164
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
165
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
166
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
167

168
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
169
        iptablesFipSynced      cache.InformerSynced
170
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
171
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
172
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
173

174
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
175
        iptablesDnatRuleSynced      cache.InformerSynced
176
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
177
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
178
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
179

180
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
181
        iptablesSnatRuleSynced      cache.InformerSynced
182
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
183
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
184
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
185

186
        ovnEipsLister     kubeovnlister.OvnEipLister
187
        ovnEipSynced      cache.InformerSynced
188
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
189
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
190
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
191
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
192

193
        ovnFipsLister     kubeovnlister.OvnFipLister
194
        ovnFipSynced      cache.InformerSynced
195
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
196
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
197
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
198

199
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
200
        ovnSnatRuleSynced      cache.InformerSynced
201
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
202
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
203
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
204

205
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
206
        ovnDnatRuleSynced      cache.InformerSynced
207
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
208
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
209
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
210

211
        providerNetworksLister kubeovnlister.ProviderNetworkLister
212
        providerNetworkSynced  cache.InformerSynced
213

214
        vlansLister     kubeovnlister.VlanLister
215
        vlanSynced      cache.InformerSynced
216
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
217
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
218
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
219
        vlanKeyMutex    keymutex.KeyMutex
220

221
        namespacesLister  v1.NamespaceLister
222
        namespacesSynced  cache.InformerSynced
223
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
224
        nsKeyMutex        keymutex.KeyMutex
225

226
        nodesLister     v1.NodeLister
227
        nodesSynced     cache.InformerSynced
228
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
229
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
230
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
231
        nodeKeyMutex    keymutex.KeyMutex
232

233
        servicesLister     v1.ServiceLister
234
        serviceSynced      cache.InformerSynced
235
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
236
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
237
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
238
        svcKeyMutex        keymutex.KeyMutex
239

240
        endpointSlicesLister          discoveryv1.EndpointSliceLister
241
        endpointSlicesSynced          cache.InformerSynced
242
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
243
        epKeyMutex                    keymutex.KeyMutex
244

245
        deploymentsLister appsv1.DeploymentLister
246
        deploymentsSynced cache.InformerSynced
247

248
        npsLister     netv1.NetworkPolicyLister
249
        npsSynced     cache.InformerSynced
250
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
251
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
252
        npKeyMutex    keymutex.KeyMutex
253

254
        sgsLister          kubeovnlister.SecurityGroupLister
255
        sgSynced           cache.InformerSynced
256
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
257
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
258
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
259
        sgKeyMutex         keymutex.KeyMutex
260

261
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
262
        qosPolicySynced      cache.InformerSynced
263
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
264
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
265
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
266

267
        configMapsLister v1.ConfigMapLister
268
        configMapsSynced cache.InformerSynced
269

270
        anpsLister     anplister.AdminNetworkPolicyLister
271
        anpsSynced     cache.InformerSynced
272
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
273
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
274
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
275
        anpKeyMutex    keymutex.KeyMutex
276

277
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
278
        dnsNameResolversSynced          cache.InformerSynced
279
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
280
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
281

282
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
283
        banpsSynced     cache.InformerSynced
284
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
285
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
286
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
287
        banpKeyMutex    keymutex.KeyMutex
288

289
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
290
        cnpsSynced     cache.InformerSynced
291
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
292
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
293
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
294
        cnpKeyMutex    keymutex.KeyMutex
295

296
        csrLister           certListerv1.CertificateSigningRequestLister
297
        csrSynced           cache.InformerSynced
298
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
299

300
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
301
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
302
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
303

304
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
305
        netAttachSynced          cache.InformerSynced
306
        netAttachInformerFactory netAttach.SharedInformerFactory
307

308
        recorder               record.EventRecorder
309
        informerFactory        kubeinformers.SharedInformerFactory
310
        cmInformerFactory      kubeinformers.SharedInformerFactory
311
        deployInformerFactory  kubeinformers.SharedInformerFactory
312
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
313
        anpInformerFactory     anpinformer.SharedInformerFactory
314

315
        // Database health check
316
        dbFailureCount int
317

318
        distributedSubnetNeedSync atomic.Bool
319
}
320

321
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
322
        if rateLimiter == nil {
2✔
323
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
324
        }
1✔
325
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
326
}
327

328
// Run creates and runs a new ovn controller
329
func Run(ctx context.Context, config *Configuration) {
×
330
        klog.V(4).Info("Creating event broadcaster")
×
331
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
332
        eventBroadcaster.StartLogging(klog.Infof)
×
333
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
334
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
335
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
336
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
337
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
338
        )
×
339

×
340
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
341
        if err != nil {
×
342
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
343
        }
×
344

345
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
346
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
347
                        listOption.AllowWatchBookmarks = true
×
348
                }))
×
349
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
350
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
351
                        listOption.AllowWatchBookmarks = true
×
352
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
353
        // deployment informer used to list/watch vpc egress gateway workloads
354
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
355
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
356
                        listOption.AllowWatchBookmarks = true
×
357
                        listOption.LabelSelector = selector.String()
×
358
                }))
×
359
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
360
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
361
                        listOption.AllowWatchBookmarks = true
×
362
                }))
×
363
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
364
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
365
                        listOption.AllowWatchBookmarks = true
×
366
                }))
×
367

368
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
369
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
370
                        listOption.AllowWatchBookmarks = true
×
371
                }),
×
372
        )
373

374
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
375

×
376
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
377
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
378
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
379
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
380
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
381
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
382
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
383
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
384
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
385
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
386
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
387
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
388
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
389
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
390
        podInformer := informerFactory.Core().V1().Pods()
×
391
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
392
        nodeInformer := informerFactory.Core().V1().Nodes()
×
393
        serviceInformer := informerFactory.Core().V1().Services()
×
394
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
395
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
396
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
397
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
398
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
399
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
400
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
401
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
402
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
403
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
404
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
405
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
406
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
407
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
408
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
409
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
410
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
411

×
412
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
413
        controller := &Controller{
×
414
                config:             config,
×
415
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
416
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
417
                ipam:               ovnipam.NewIPAM(),
×
418
                namedPort:          NewNamedPort(),
×
419

×
420
                vpcsLister:           vpcInformer.Lister(),
×
421
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
422
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
423
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
424
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
425
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
426
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
427

×
428
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
429
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
430
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
431
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
432
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
433
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
434
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
435
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
436
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
437
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
438
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
439
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
440
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
441
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
442
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
443
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
444
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
445

×
446
                subnetsLister:           subnetInformer.Lister(),
×
447
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
448
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
449
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
450
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
451
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
452
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
453

×
454
                ippoolLister:            ippoolInformer.Lister(),
×
455
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
456
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
457
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
458
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
459
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
460

×
461
                ipsLister:     ipInformer.Lister(),
×
462
                ipSynced:      ipInformer.Informer().HasSynced,
×
463
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
464
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
465
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
466

×
467
                virtualIpsLister:          virtualIPInformer.Lister(),
×
468
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
469
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
470
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
471
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
472
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
473

×
474
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
475
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
476
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
477
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
478
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
479
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
480

×
481
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
482
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
483
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
484
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
485
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
486

×
487
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
488
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
489
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
490
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
491
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
492

×
493
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
494
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
495
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
496
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
497
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
498

×
499
                vlansLister:     vlanInformer.Lister(),
×
500
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
501
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
502
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
503
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
504
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
505

×
506
                providerNetworksLister: providerNetworkInformer.Lister(),
×
507
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
508

×
509
                podsLister:          podInformer.Lister(),
×
510
                podsSynced:          podInformer.Informer().HasSynced,
×
511
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
512
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
513
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
514
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
515
                                Name:          "DeletePod",
×
516
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
517
                        },
×
518
                ),
×
519
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
520
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
521

×
522
                namespacesLister:  namespaceInformer.Lister(),
×
523
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
524
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
525
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
526

×
527
                nodesLister:     nodeInformer.Lister(),
×
528
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
529
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
530
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
531
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
532
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
533

×
534
                servicesLister:     serviceInformer.Lister(),
×
535
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
536
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
537
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
538
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
539
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
540

×
541
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
542
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
543
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
544
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
545

×
546
                deploymentsLister: deploymentInformer.Lister(),
×
547
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
548

×
549
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
550
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
551
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
552
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
553
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
554

×
555
                configMapsLister: configMapInformer.Lister(),
×
556
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
557

×
558
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
559
                sgsLister:          sgInformer.Lister(),
×
560
                sgSynced:           sgInformer.Informer().HasSynced,
×
561
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
562
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
563
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
564

×
565
                ovnEipsLister:     ovnEipInformer.Lister(),
×
566
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
567
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
568
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
569
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
570
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
571

×
572
                ovnFipsLister:     ovnFipInformer.Lister(),
×
573
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
574
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
575
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
576
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
577

×
578
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
579
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
580
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
581
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
582
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
583

×
584
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
585
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
586
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
587
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
588
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
589

×
590
                csrLister:           csrInformer.Lister(),
×
591
                csrSynced:           csrInformer.Informer().HasSynced,
×
592
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
593

×
594
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
595
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
596
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
597

×
598
                netAttachLister:          netAttachInformer.Lister(),
×
599
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
600
                netAttachInformerFactory: attachNetInformerFactory,
×
601

×
602
                recorder:               recorder,
×
603
                informerFactory:        informerFactory,
×
604
                cmInformerFactory:      cmInformerFactory,
×
605
                deployInformerFactory:  deployInformerFactory,
×
606
                kubeovnInformerFactory: kubeovnInformerFactory,
×
607
                anpInformerFactory:     anpInformerFactory,
×
608
        }
×
609

×
610
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
611
                config.OvnNbAddr,
×
612
                config.OvnTimeout,
×
613
                config.OvsDbConnectTimeout,
×
614
                config.OvsDbInactivityTimeout,
×
615
                config.OvsDbConnectMaxRetry,
×
616
        ); err != nil {
×
617
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
618
        }
×
619
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
620
                config.OvnSbAddr,
×
621
                config.OvnTimeout,
×
622
                config.OvsDbConnectTimeout,
×
623
                config.OvsDbInactivityTimeout,
×
624
                config.OvsDbConnectMaxRetry,
×
625
        ); err != nil {
×
626
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
627
        }
×
628
        if config.EnableLb {
×
629
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
630
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
631
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
632
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
633
                        "DeleteSwitchLBRule",
×
634
                        workqueue.NewTypedMaxOfRateLimiter(
×
635
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
636
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
637
                        ),
×
638
                )
×
639
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
640
                        "UpdateSwitchLBRule",
×
641
                        workqueue.NewTypedMaxOfRateLimiter(
×
642
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
643
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
644
                        ),
×
645
                )
×
646

×
647
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
648
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
649
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
650
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
651
        }
×
652

653
        if config.EnableNP {
×
654
                controller.npsLister = npInformer.Lister()
×
655
                controller.npsSynced = npInformer.Informer().HasSynced
×
656
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
657
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
658
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
659
        }
×
660

661
        if config.EnableANP {
×
662
                controller.anpsLister = anpInformer.Lister()
×
663
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
664
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
665
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
666
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
667
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
668

×
669
                controller.banpsLister = banpInformer.Lister()
×
670
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
671
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
672
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
673
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
674
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
675

×
676
                controller.cnpsLister = cnpInformer.Lister()
×
677
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
678
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
679
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
680
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
681
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
682
        }
×
683

684
        if config.EnableDNSNameResolver {
×
NEW
685
                if !config.EnableANP {
×
NEW
686
                        klog.Warning("DNS name resolver is enabled but ANP support is disabled, DNSNameResolver resources will not take effect")
×
NEW
687
                }
×
688
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
689
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
690
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
691
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
692
        }
693

694
        defer controller.shutdown()
×
695
        klog.Info("Starting OVN controller")
×
696

×
697
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
698
        // NAD CRD is optional, so we check if it exists before starting the informer
×
699
        controller.StartNetAttachInformerFactory(ctx)
×
700

×
701
        // Wait for the caches to be synced before starting workers
×
702
        controller.informerFactory.Start(ctx.Done())
×
703
        controller.cmInformerFactory.Start(ctx.Done())
×
704
        controller.deployInformerFactory.Start(ctx.Done())
×
705
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
706
        controller.anpInformerFactory.Start(ctx.Done())
×
707
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
708

×
709
        klog.Info("Waiting for informer caches to sync")
×
710
        cacheSyncs := []cache.InformerSynced{
×
711
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
712
                controller.vpcSynced, controller.subnetSynced,
×
713
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
714
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
715
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
716
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
717
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
718
                controller.ovnDnatRuleSynced,
×
719
        }
×
720
        if controller.config.EnableLb {
×
721
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
722
        }
×
723
        if controller.config.EnableNP {
×
724
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
725
        }
×
726
        if controller.config.EnableANP {
×
727
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
728
        }
×
729
        if controller.config.EnableDNSNameResolver {
×
730
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
731
        }
×
732

733
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
734
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
735
        }
×
736

737
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
738
                AddFunc:    controller.enqueueAddPod,
×
739
                DeleteFunc: controller.enqueueDeletePod,
×
740
                UpdateFunc: controller.enqueueUpdatePod,
×
741
        }); err != nil {
×
742
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
743
        }
×
744

745
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
746
                AddFunc:    controller.enqueueAddNamespace,
×
747
                UpdateFunc: controller.enqueueUpdateNamespace,
×
748
                DeleteFunc: controller.enqueueDeleteNamespace,
×
749
        }); err != nil {
×
750
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
751
        }
×
752

753
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
754
                AddFunc:    controller.enqueueAddNode,
×
755
                UpdateFunc: controller.enqueueUpdateNode,
×
756
                DeleteFunc: controller.enqueueDeleteNode,
×
757
        }); err != nil {
×
758
                util.LogFatalAndExit(err, "failed to add node event handler")
×
759
        }
×
760

761
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
762
                AddFunc:    controller.enqueueAddService,
×
763
                DeleteFunc: controller.enqueueDeleteService,
×
764
                UpdateFunc: controller.enqueueUpdateService,
×
765
        }); err != nil {
×
766
                util.LogFatalAndExit(err, "failed to add service event handler")
×
767
        }
×
768

769
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
770
                AddFunc:    controller.enqueueAddEndpointSlice,
×
771
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
772
        }); err != nil {
×
773
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
774
        }
×
775

776
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
777
                AddFunc:    controller.enqueueAddDeployment,
×
778
                UpdateFunc: controller.enqueueUpdateDeployment,
×
779
        }); err != nil {
×
780
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
781
        }
×
782

783
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
784
                AddFunc:    controller.enqueueAddVpc,
×
785
                UpdateFunc: controller.enqueueUpdateVpc,
×
786
                DeleteFunc: controller.enqueueDelVpc,
×
787
        }); err != nil {
×
788
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
789
        }
×
790

791
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
792
                AddFunc:    controller.enqueueAddVpcNatGw,
×
793
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
794
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
795
        }); err != nil {
×
796
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
797
        }
×
798

799
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
800
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
801
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
802
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
803
        }); err != nil {
×
804
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
805
        }
×
806

807
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
808
                AddFunc:    controller.enqueueAddSubnet,
×
809
                UpdateFunc: controller.enqueueUpdateSubnet,
×
810
                DeleteFunc: controller.enqueueDeleteSubnet,
×
811
        }); err != nil {
×
812
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
813
        }
×
814

815
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
816
                AddFunc:    controller.enqueueAddIPPool,
×
817
                UpdateFunc: controller.enqueueUpdateIPPool,
×
818
                DeleteFunc: controller.enqueueDeleteIPPool,
×
819
        }); err != nil {
×
820
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
821
        }
×
822

823
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
824
                AddFunc:    controller.enqueueAddIP,
×
825
                UpdateFunc: controller.enqueueUpdateIP,
×
826
                DeleteFunc: controller.enqueueDelIP,
×
827
        }); err != nil {
×
828
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
829
        }
×
830

831
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
832
                AddFunc:    controller.enqueueAddVlan,
×
833
                DeleteFunc: controller.enqueueDelVlan,
×
834
                UpdateFunc: controller.enqueueUpdateVlan,
×
835
        }); err != nil {
×
836
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
837
        }
×
838

839
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
840
                AddFunc:    controller.enqueueAddSg,
×
841
                DeleteFunc: controller.enqueueDeleteSg,
×
842
                UpdateFunc: controller.enqueueUpdateSg,
×
843
        }); err != nil {
×
844
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
845
        }
×
846

847
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
848
                AddFunc:    controller.enqueueAddVirtualIP,
×
849
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
850
                DeleteFunc: controller.enqueueDelVirtualIP,
×
851
        }); err != nil {
×
852
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
853
        }
×
854

855
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
856
                AddFunc:    controller.enqueueAddIptablesEip,
×
857
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
858
                DeleteFunc: controller.enqueueDelIptablesEip,
×
859
        }); err != nil {
×
860
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
861
        }
×
862

863
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
864
                AddFunc:    controller.enqueueAddIptablesFip,
×
865
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
866
                DeleteFunc: controller.enqueueDelIptablesFip,
×
867
        }); err != nil {
×
868
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
869
        }
×
870

871
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
872
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
873
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
874
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
875
        }); err != nil {
×
876
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
877
        }
×
878

879
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
880
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
881
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
882
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
883
        }); err != nil {
×
884
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
885
        }
×
886

887
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
888
                AddFunc:    controller.enqueueAddOvnEip,
×
889
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
890
                DeleteFunc: controller.enqueueDelOvnEip,
×
891
        }); err != nil {
×
892
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
893
        }
×
894

895
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
896
                AddFunc:    controller.enqueueAddOvnFip,
×
897
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
898
                DeleteFunc: controller.enqueueDelOvnFip,
×
899
        }); err != nil {
×
900
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
901
        }
×
902

903
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
904
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
905
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
906
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
907
        }); err != nil {
×
908
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
909
        }
×
910

911
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
912
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
913
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
914
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
915
        }); err != nil {
×
916
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
917
        }
×
918

919
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
920
                AddFunc:    controller.enqueueAddQoSPolicy,
×
921
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
922
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
923
        }); err != nil {
×
924
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
925
        }
×
926

927
        if config.EnableLb {
×
928
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
929
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
930
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
931
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
932
                }); err != nil {
×
933
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
934
                }
×
935

936
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
937
                        AddFunc:    controller.enqueueAddVpcDNS,
×
938
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
939
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
940
                }); err != nil {
×
941
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
942
                }
×
943
        }
944

945
        if config.EnableNP {
×
946
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
947
                        AddFunc:    controller.enqueueAddNp,
×
948
                        UpdateFunc: controller.enqueueUpdateNp,
×
949
                        DeleteFunc: controller.enqueueDeleteNp,
×
950
                }); err != nil {
×
951
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
952
                }
×
953
        }
954

955
        if config.EnableANP {
×
956
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
957
                        AddFunc:    controller.enqueueAddAnp,
×
958
                        UpdateFunc: controller.enqueueUpdateAnp,
×
959
                        DeleteFunc: controller.enqueueDeleteAnp,
×
960
                }); err != nil {
×
961
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
962
                }
×
963

964
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
965
                        AddFunc:    controller.enqueueAddBanp,
×
966
                        UpdateFunc: controller.enqueueUpdateBanp,
×
967
                        DeleteFunc: controller.enqueueDeleteBanp,
×
968
                }); err != nil {
×
969
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
970
                }
×
971

972
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
973
                        AddFunc:    controller.enqueueAddCnp,
×
974
                        UpdateFunc: controller.enqueueUpdateCnp,
×
975
                        DeleteFunc: controller.enqueueDeleteCnp,
×
976
                }); err != nil {
×
977
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
978
                }
×
979

980
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
981
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
982
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
983
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
984
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
985
        }
986

987
        if config.EnableDNSNameResolver {
×
988
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
989
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
990
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
991
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
992
                }); err != nil {
×
993
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
994
                }
×
995
        }
996

997
        if config.EnableOVNIPSec {
×
998
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
999
                        AddFunc:    controller.enqueueAddCsr,
×
1000
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1001
                        // no need to add delete func for csr
×
1002
                }); err != nil {
×
1003
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1004
                }
×
1005
        }
1006

1007
        controller.Run(ctx)
×
1008
}
1009

1010
// Run will set up the event handlers for types we are interested in, as well
1011
// as syncing informer caches and starting workers. It will block until stopCh
1012
// is closed, at which point it will shutdown the workqueue and wait for
1013
// workers to finish processing their current work items.
1014
func (c *Controller) Run(ctx context.Context) {
×
1015
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1016
        // Otherwise, the init process should be placed after all workers have already started working
×
1017
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1018
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1019
        }
×
1020

1021
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1022
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1023
        }
×
1024

1025
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1026
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1027
        }
×
1028

1029
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1030
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1031
        }
×
1032

1033
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1034
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1035
        }
×
1036

1037
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1038
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1039
        }
×
1040

1041
        if err := c.InitOVN(); err != nil {
×
1042
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1043
        }
×
1044

1045
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1046
        if err := c.syncIPCR(); err != nil {
×
1047
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1048
        }
×
1049

1050
        if err := c.syncFinalizers(); err != nil {
×
1051
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1052
        }
×
1053

1054
        if err := c.InitIPAM(); err != nil {
×
1055
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1056
        }
×
1057

1058
        if err := c.syncNodeRoutes(); err != nil {
×
1059
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1060
        }
×
1061

1062
        if err := c.syncSubnetCR(); err != nil {
×
1063
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1064
        }
×
1065

1066
        if err := c.syncVlanCR(); err != nil {
×
1067
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1068
        }
×
1069

1070
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1071
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1072
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1073
                }
×
1074
        }
1075

1076
        // start workers to do all the network operations
1077
        c.startWorkers(ctx)
×
1078

×
1079
        c.initResourceOnce()
×
1080
        <-ctx.Done()
×
1081
        klog.Info("Shutting down workers")
×
1082

×
1083
        c.OVNNbClient.Close()
×
1084
        c.OVNSbClient.Close()
×
1085
}
1086

1087
func (c *Controller) dbStatus() {
×
1088
        const maxFailures = 5
×
1089

×
1090
        done := make(chan error, 2)
×
1091
        go func() {
×
1092
                done <- c.OVNNbClient.Echo(context.Background())
×
1093
        }()
×
1094
        go func() {
×
1095
                done <- c.OVNSbClient.Echo(context.Background())
×
1096
        }()
×
1097

1098
        resultsReceived := 0
×
1099
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1100

×
1101
        for resultsReceived < 2 {
×
1102
                select {
×
1103
                case err := <-done:
×
1104
                        resultsReceived++
×
1105
                        if err != nil {
×
1106
                                c.dbFailureCount++
×
1107
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1108
                                if c.dbFailureCount >= maxFailures {
×
1109
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1110
                                }
×
1111
                                return
×
1112
                        }
1113
                case <-timeout:
×
1114
                        c.dbFailureCount++
×
1115
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1116
                        if c.dbFailureCount >= maxFailures {
×
1117
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1118
                        }
×
1119
                        return
×
1120
                }
1121
        }
1122

1123
        if c.dbFailureCount > 0 {
×
1124
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1125
                c.dbFailureCount = 0
×
1126
        }
×
1127
}
1128

1129
func (c *Controller) shutdown() {
×
1130
        utilruntime.HandleCrash()
×
1131

×
1132
        c.addOrUpdatePodQueue.ShutDown()
×
1133
        c.deletePodQueue.ShutDown()
×
1134
        c.updatePodSecurityQueue.ShutDown()
×
1135

×
1136
        c.addNamespaceQueue.ShutDown()
×
1137

×
1138
        c.addOrUpdateSubnetQueue.ShutDown()
×
1139
        c.deleteSubnetQueue.ShutDown()
×
1140
        c.updateSubnetStatusQueue.ShutDown()
×
1141
        c.syncVirtualPortsQueue.ShutDown()
×
1142

×
1143
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1144
        c.updateIPPoolStatusQueue.ShutDown()
×
1145
        c.deleteIPPoolQueue.ShutDown()
×
1146

×
1147
        c.addNodeQueue.ShutDown()
×
1148
        c.updateNodeQueue.ShutDown()
×
1149
        c.deleteNodeQueue.ShutDown()
×
1150

×
1151
        c.addServiceQueue.ShutDown()
×
1152
        c.deleteServiceQueue.ShutDown()
×
1153
        c.updateServiceQueue.ShutDown()
×
1154
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1155

×
1156
        c.addVlanQueue.ShutDown()
×
1157
        c.delVlanQueue.ShutDown()
×
1158
        c.updateVlanQueue.ShutDown()
×
1159

×
1160
        c.addOrUpdateVpcQueue.ShutDown()
×
1161
        c.updateVpcStatusQueue.ShutDown()
×
1162
        c.delVpcQueue.ShutDown()
×
1163

×
1164
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1165
        c.initVpcNatGatewayQueue.ShutDown()
×
1166
        c.delVpcNatGatewayQueue.ShutDown()
×
1167
        c.updateVpcEipQueue.ShutDown()
×
1168
        c.updateVpcFloatingIPQueue.ShutDown()
×
1169
        c.updateVpcDnatQueue.ShutDown()
×
1170
        c.updateVpcSnatQueue.ShutDown()
×
1171
        c.updateVpcSubnetQueue.ShutDown()
×
1172

×
1173
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1174
        c.delVpcEgressGatewayQueue.ShutDown()
×
1175

×
1176
        if c.config.EnableLb {
×
1177
                c.addSwitchLBRuleQueue.ShutDown()
×
1178
                c.delSwitchLBRuleQueue.ShutDown()
×
1179
                c.updateSwitchLBRuleQueue.ShutDown()
×
1180

×
1181
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1182
                c.delVpcDNSQueue.ShutDown()
×
1183
        }
×
1184

1185
        c.addIPQueue.ShutDown()
×
1186
        c.updateIPQueue.ShutDown()
×
1187
        c.delIPQueue.ShutDown()
×
1188

×
1189
        c.addVirtualIPQueue.ShutDown()
×
1190
        c.updateVirtualIPQueue.ShutDown()
×
1191
        c.updateVirtualParentsQueue.ShutDown()
×
1192
        c.delVirtualIPQueue.ShutDown()
×
1193

×
1194
        c.addIptablesEipQueue.ShutDown()
×
1195
        c.updateIptablesEipQueue.ShutDown()
×
1196
        c.resetIptablesEipQueue.ShutDown()
×
1197
        c.delIptablesEipQueue.ShutDown()
×
1198

×
1199
        c.addIptablesFipQueue.ShutDown()
×
1200
        c.updateIptablesFipQueue.ShutDown()
×
1201
        c.delIptablesFipQueue.ShutDown()
×
1202

×
1203
        c.addIptablesDnatRuleQueue.ShutDown()
×
1204
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1205
        c.delIptablesDnatRuleQueue.ShutDown()
×
1206

×
1207
        c.addIptablesSnatRuleQueue.ShutDown()
×
1208
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1209
        c.delIptablesSnatRuleQueue.ShutDown()
×
1210

×
1211
        c.addQoSPolicyQueue.ShutDown()
×
1212
        c.updateQoSPolicyQueue.ShutDown()
×
1213
        c.delQoSPolicyQueue.ShutDown()
×
1214

×
1215
        c.addOvnEipQueue.ShutDown()
×
1216
        c.updateOvnEipQueue.ShutDown()
×
1217
        c.resetOvnEipQueue.ShutDown()
×
1218
        c.delOvnEipQueue.ShutDown()
×
1219

×
1220
        c.addOvnFipQueue.ShutDown()
×
1221
        c.updateOvnFipQueue.ShutDown()
×
1222
        c.delOvnFipQueue.ShutDown()
×
1223

×
1224
        c.addOvnSnatRuleQueue.ShutDown()
×
1225
        c.updateOvnSnatRuleQueue.ShutDown()
×
1226
        c.delOvnSnatRuleQueue.ShutDown()
×
1227

×
1228
        c.addOvnDnatRuleQueue.ShutDown()
×
1229
        c.updateOvnDnatRuleQueue.ShutDown()
×
1230
        c.delOvnDnatRuleQueue.ShutDown()
×
1231

×
1232
        if c.config.EnableNP {
×
1233
                c.updateNpQueue.ShutDown()
×
1234
                c.deleteNpQueue.ShutDown()
×
1235
        }
×
1236
        if c.config.EnableANP {
×
1237
                c.addAnpQueue.ShutDown()
×
1238
                c.updateAnpQueue.ShutDown()
×
1239
                c.deleteAnpQueue.ShutDown()
×
1240

×
1241
                c.addBanpQueue.ShutDown()
×
1242
                c.updateBanpQueue.ShutDown()
×
1243
                c.deleteBanpQueue.ShutDown()
×
1244

×
1245
                c.addCnpQueue.ShutDown()
×
1246
                c.updateCnpQueue.ShutDown()
×
1247
                c.deleteCnpQueue.ShutDown()
×
1248
        }
×
1249

1250
        if c.config.EnableDNSNameResolver {
×
1251
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1252
                c.deleteDNSNameResolverQueue.ShutDown()
×
1253
        }
×
1254

1255
        c.addOrUpdateSgQueue.ShutDown()
×
1256
        c.delSgQueue.ShutDown()
×
1257
        c.syncSgPortsQueue.ShutDown()
×
1258

×
1259
        c.addOrUpdateCsrQueue.ShutDown()
×
1260

×
1261
        if c.config.EnableLiveMigrationOptimize {
×
1262
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1263
        }
×
1264
}
1265

1266
func (c *Controller) startWorkers(ctx context.Context) {
×
1267
        klog.Info("Starting workers")
×
1268

×
1269
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1270
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1272

×
1273
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1274
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1275
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1276
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1277
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1278
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1279
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1280
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1281
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1282
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1283
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1284
        // add default and join subnet and wait them ready
×
1285
        for range c.config.WorkerNum {
×
1286
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1287
        }
×
1288
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1289
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1290
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1291
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1292
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1293
                klog.Infof("wait for subnets %v ready", subnets)
×
1294

×
1295
                return c.allSubnetReady(subnets...)
×
1296
        })
×
1297
        if err != nil {
×
1298
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1299
        }
×
1300

1301
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1302
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1303
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1304

×
1305
        // run node worker before handle any pods
×
1306
        for range c.config.WorkerNum {
×
1307
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1308
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1309
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1310
        }
×
1311
        for {
×
1312
                ready := true
×
1313
                time.Sleep(3 * time.Second)
×
1314
                nodes, err := c.nodesLister.List(labels.Everything())
×
1315
                if err != nil {
×
1316
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1317
                }
×
1318
                for _, node := range nodes {
×
1319
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1320
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1321
                                ready = false
×
1322
                                break
×
1323
                        }
1324
                }
1325
                if ready {
×
1326
                        break
×
1327
                }
1328
        }
1329

1330
        if c.config.EnableLb {
×
1331
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1332
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1333
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1334

×
1335
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1336
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1337
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1338

×
1339
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1340
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1341
                go wait.Until(func() {
×
1342
                        c.resyncVpcDNSConfig()
×
1343
                }, 5*time.Second, ctx.Done())
×
1344
        }
1345

1346
        for range c.config.WorkerNum {
×
1347
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1348
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1349
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1350

×
1351
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1352
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1353
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1354
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1355
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1356

×
1357
                if c.config.EnableLb {
×
1358
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1359
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1360
                }
×
1361

1362
                if c.config.EnableNP {
×
1363
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1364
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1365
                }
×
1366

1367
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1368
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1369
        }
1370

1371
        if c.config.EnableEipSnat {
×
1372
                go wait.Until(func() {
×
1373
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1374
                        c.resyncExternalGateway()
×
1375
                }, time.Second, ctx.Done())
×
1376

1377
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1378
                c.OVNNbClient.MonitorBFD()
×
1379
        }
1380
        // TODO: we should merge these two vpc nat config into one config and resync them together
1381
        go wait.Until(func() {
×
1382
                c.resyncVpcNatGwConfig()
×
1383
        }, time.Second, ctx.Done())
×
1384

1385
        go wait.Until(func() {
×
1386
                c.resyncVpcNatConfig()
×
1387
        }, time.Second, ctx.Done())
×
1388

1389
        if c.config.GCInterval != 0 {
×
1390
                go wait.Until(func() {
×
1391
                        if err := c.markAndCleanLSP(); err != nil {
×
1392
                                klog.Errorf("gc lsp error: %v", err)
×
1393
                        }
×
1394
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1395
        }
1396

1397
        go wait.Until(func() {
×
1398
                if err := c.inspectPod(); err != nil {
×
1399
                        klog.Errorf("inspection error: %v", err)
×
1400
                }
×
1401
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1402

1403
        if c.config.EnableExternalVpc {
×
1404
                go wait.Until(func() {
×
1405
                        c.syncExternalVpc()
×
1406
                }, 5*time.Second, ctx.Done())
×
1407
        }
1408

1409
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1410
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1411
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1412
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1413

×
1414
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1415
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1416
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1417
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1418

×
1419
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1420
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1421
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1422

×
1423
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1424
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1425
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1426

×
1427
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1428
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1429
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1430

×
1431
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1432

×
1433
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1434
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1435
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1436

×
1437
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1438
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1439
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1441

×
1442
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1443
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1444
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1445
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1446

×
1447
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1448
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1449
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1450

×
1451
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1452
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1453
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1454

×
1455
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1456
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1457
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1458

×
1459
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1460
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1461
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1462

×
1463
        if c.config.EnableANP {
×
1464
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1465
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1466
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1467

×
1468
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1469
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1470
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1471

×
1472
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1473
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1474
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1475
        }
×
1476

1477
        if c.config.EnableDNSNameResolver {
×
1478
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1479
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1480
        }
×
1481

1482
        if c.config.EnableLiveMigrationOptimize {
×
1483
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1484
        }
×
1485

1486
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1487

×
1488
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1489
}
1490

1491
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1492
        for _, lsName := range subnets {
2✔
1493
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1494
                if err != nil {
1✔
1495
                        klog.Error(err)
×
1496
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1497
                }
×
1498

1499
                if !exist {
2✔
1500
                        return false, nil
1✔
1501
                }
1✔
1502
        }
1503

1504
        return true, nil
1✔
1505
}
1506

1507
func (c *Controller) initResourceOnce() {
×
1508
        c.registerSubnetMetrics()
×
1509

×
1510
        if err := c.initNodeChassis(); err != nil {
×
1511
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1512
        }
×
1513

1514
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1515
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1516
        }
×
1517
        if err := c.syncSecurityGroup(); err != nil {
×
1518
                util.LogFatalAndExit(err, "failed to sync security group")
×
1519
        }
×
1520

1521
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1522
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1523
        }
×
1524

1525
        if err := c.initVpcNatGw(); err != nil {
×
1526
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1527
        }
×
1528
        if c.config.EnableLb {
×
1529
                if err := c.initVpcDNSConfig(); err != nil {
×
1530
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1531
                }
×
1532
        }
1533

1534
        // remove resources in ovndb that not exist any more in kubernetes resources
1535
        // process gc at last in case of affecting other init process
1536
        if err := c.gc(); err != nil {
×
1537
                util.LogFatalAndExit(err, "failed to run gc")
×
1538
        }
×
1539
}
1540

1541
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1542
        item, shutdown := queue.Get()
×
1543
        if shutdown {
×
1544
                return false
×
1545
        }
×
1546

1547
        err := func(item T) error {
×
1548
                defer queue.Done(item)
×
1549
                if err := handler(item); err != nil {
×
1550
                        queue.AddRateLimited(item)
×
1551
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1552
                }
×
1553
                queue.Forget(item)
×
1554
                return nil
×
1555
        }(item)
1556
        if err != nil {
×
1557
                utilruntime.HandleError(err)
×
1558
                return true
×
1559
        }
×
1560
        return true
×
1561
}
1562

1563
func getWorkItemKey(obj any) string {
×
1564
        switch v := obj.(type) {
×
1565
        case string:
×
1566
                return v
×
1567
        case *vpcService:
×
1568
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1569
        case *AdminNetworkPolicyChangedDelta:
×
1570
                return v.key
×
1571
        case *SlrInfo:
×
1572
                return v.Name
×
1573
        default:
×
1574
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1575
                if err != nil {
×
1576
                        utilruntime.HandleError(err)
×
1577
                        return ""
×
1578
                }
×
1579
                return key
×
1580
        }
1581
}
1582

1583
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1584
        return func() {
×
1585
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1586
                }
×
1587
        }
1588
}
1589

1590
// apiResourceExists checks if all specified kinds exist in the given group version.
1591
// It returns true if all kinds are found, false otherwise.
1592
// Parameters:
1593
// - discoveryClient: The discovery client to use for querying API resources.
1594
// - gv: The group version string (e.g., "apps/v1").
1595
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
1596
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
1597
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
1598
        if err != nil {
×
1599
                if k8serrors.IsNotFound(err) {
×
1600
                        return false, nil
×
1601
                }
×
1602
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1603
        }
1604

1605
        existingKinds := set.New[string]()
×
1606
        for _, apiResource := range apiResourceLists.APIResources {
×
1607
                existingKinds.Insert(apiResource.Kind)
×
1608
        }
×
1609

1610
        return existingKinds.HasAll(kinds...), nil
×
1611
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc