• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 24622570643

19 Apr 2026 06:12AM UTC coverage: 24.56% (+0.02%) from 24.541%
24622570643

push

github

web-flow
perf(controller): use informer indexers for hot-path lookups (#6646)

* perf(controller): use informer indexers for hot-path lookups

Replace full-store linear scans in two hot paths with O(1) indexer
lookups:

- checkAndUpdateNodePortGroup previously iterated every Pod for every
  Node (O(N_node × N_pod)). It now fetches pods by node via a
  byNodeName indexer. The cluster-wide NetworkPolicy existence check
  also switches from a full Lister.List to indexer ListKeys.
- findEndpointSlicesForServices previously listed every EndpointSlice
  in the namespace and filtered by label in a nested loop. It now
  queries a byServiceName indexer keyed on namespace/name.

Introduces pkg/controller/indexers.go as the first custom informer
indexer wiring in the controller package, registered before informer
startup, so future hot paths can reuse the same infrastructure.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* perf(controller): address review feedback on indexer wiring

- Use util.LogFatalAndExit for setupIndexers failure to match the
  rest of the Run() startup path.
- Reuse Controller.setupIndexers in the fake controller instead of
  manually calling AddIndexers, so tests exercise the same wiring as
  production and automatically pick up new indexers.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* test(controller): add benchmarks comparing indexer lookup vs full scan

TestIndexersResultParityWithFullScan confirms the indexer lookup returns
the same object set as the pre-indexer full scan + filter across every
key, including unscheduled pods and orphan endpointslices.

BenchmarkPodByNode and BenchmarkEPSByService quantify the improvement
at 10k objects:
- PodByNode: ~95x faster (2.5us vs 238us) / ~1... (continued)

28 of 50 new or added lines in 4 files covered. (56.0%)

3 existing lines in 2 files now uncovered.

13850 of 56393 relevant lines covered (24.56%)

0.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.13
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/discovery"
23
        kubeinformers "k8s.io/client-go/informers"
24
        "k8s.io/client-go/kubernetes/scheme"
25
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
26
        appsv1 "k8s.io/client-go/listers/apps/v1"
27
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
28
        v1 "k8s.io/client-go/listers/core/v1"
29
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
30
        netv1 "k8s.io/client-go/listers/networking/v1"
31
        "k8s.io/client-go/tools/cache"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/util/workqueue"
34
        "k8s.io/klog/v2"
35
        "k8s.io/utils/keymutex"
36
        "k8s.io/utils/set"
37
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
38
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
39
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
40
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
41
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
42

43
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
44
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
45
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
46
        "github.com/kubeovn/kube-ovn/pkg/informer"
47
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
48
        "github.com/kubeovn/kube-ovn/pkg/ovs"
49
        "github.com/kubeovn/kube-ovn/pkg/util"
50
)
51

52
const controllerAgentName = "kube-ovn-controller"
53

54
const (
55
        logicalSwitchKey              = "ls"
56
        logicalRouterKey              = "lr"
57
        portGroupKey                  = "pg"
58
        networkPolicyKey              = "np"
59
        sgKey                         = "sg"
60
        sgsKey                        = "security_groups"
61
        u2oKey                        = "u2o"
62
        adminNetworkPolicyKey         = "anp"
63
        baselineAdminNetworkPolicyKey = "banp"
64
        ippoolKey                     = "ippool"
65
        clusterNetworkPolicyKey       = "cnp"
66
)
67

68
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
69
type Controller struct {
70
        config *Configuration
71

72
        ipam             *ovnipam.IPAM
73
        namedPort        *NamedPort
74
        anpPrioNameMap   map[int32]string
75
        anpNamePrioMap   map[string]int32
76
        bnpPrioNameMap   map[int32]string
77
        bnpNamePrioMap   map[string]int32
78
        priorityMapMutex sync.RWMutex
79

80
        OVNNbClient ovs.NbClient
81
        OVNSbClient ovs.SbClient
82

83
        // ExternalGatewayType define external gateway type, centralized
84
        ExternalGatewayType string
85

86
        podsLister             v1.PodLister
87
        podsSynced             cache.InformerSynced
88
        podIndexer             cache.Indexer
89
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
90
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
91
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
92
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
93
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
94
        podKeyMutex            keymutex.KeyMutex
95

96
        vpcsLister           kubeovnlister.VpcLister
97
        vpcSynced            cache.InformerSynced
98
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
99
        vpcLastPoliciesMap   *xsync.Map[string, string]
100
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
101
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
102
        vpcKeyMutex          keymutex.KeyMutex
103

104
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
105
        vpcNatGatewaySynced           cache.InformerSynced
106
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
107
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
108
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
109
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
110
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
111
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
112
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
113
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
114
        vpcNatGwKeyMutex              keymutex.KeyMutex
115
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
116

117
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
118
        vpcEgressGatewaySynced           cache.InformerSynced
119
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
120
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
121
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
122

123
        bgpConfLister  kubeovnlister.BgpConfLister
124
        bgpConfSynced  cache.InformerSynced
125
        evpnConfLister kubeovnlister.EvpnConfLister
126
        evpnConfSynced cache.InformerSynced
127

128
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
129
        switchLBRuleSynced      cache.InformerSynced
130
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
131
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
132
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
133

134
        vpcDNSLister           kubeovnlister.VpcDnsLister
135
        vpcDNSSynced           cache.InformerSynced
136
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
137
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
138

139
        subnetsLister           kubeovnlister.SubnetLister
140
        subnetSynced            cache.InformerSynced
141
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
142
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
143
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
144
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
145
        subnetKeyMutex          keymutex.KeyMutex
146

147
        ippoolLister            kubeovnlister.IPPoolLister
148
        ippoolSynced            cache.InformerSynced
149
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
150
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
151
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
152
        ippoolKeyMutex          keymutex.KeyMutex
153

154
        ipsLister     kubeovnlister.IPLister
155
        ipSynced      cache.InformerSynced
156
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
157
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
158
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
159

160
        virtualIpsLister          kubeovnlister.VipLister
161
        virtualIpsSynced          cache.InformerSynced
162
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
163
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
164
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
165
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
166

167
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
168
        iptablesEipSynced      cache.InformerSynced
169
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
170
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
171
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
172
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
173

174
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
175
        iptablesFipSynced      cache.InformerSynced
176
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
177
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
178
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
179

180
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
181
        iptablesDnatRuleSynced      cache.InformerSynced
182
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
183
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
184
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
185

186
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
187
        iptablesSnatRuleSynced      cache.InformerSynced
188
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
189
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
190
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
191

192
        ovnEipsLister     kubeovnlister.OvnEipLister
193
        ovnEipSynced      cache.InformerSynced
194
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
195
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
196
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
197
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
198

199
        ovnFipsLister     kubeovnlister.OvnFipLister
200
        ovnFipSynced      cache.InformerSynced
201
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
202
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
203
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
204

205
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
206
        ovnSnatRuleSynced      cache.InformerSynced
207
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
208
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
209
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
210

211
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
212
        ovnDnatRuleSynced      cache.InformerSynced
213
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
214
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
215
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
216

217
        providerNetworksLister kubeovnlister.ProviderNetworkLister
218
        providerNetworkSynced  cache.InformerSynced
219

220
        vlansLister     kubeovnlister.VlanLister
221
        vlanSynced      cache.InformerSynced
222
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
223
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
224
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
225
        vlanKeyMutex    keymutex.KeyMutex
226

227
        namespacesLister  v1.NamespaceLister
228
        namespacesSynced  cache.InformerSynced
229
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
230
        nsKeyMutex        keymutex.KeyMutex
231

232
        nodesLister     v1.NodeLister
233
        nodesSynced     cache.InformerSynced
234
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
235
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
236
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
237
        nodeKeyMutex    keymutex.KeyMutex
238

239
        servicesLister     v1.ServiceLister
240
        serviceSynced      cache.InformerSynced
241
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
242
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
243
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
244
        svcKeyMutex        keymutex.KeyMutex
245

246
        endpointSlicesLister          discoveryv1.EndpointSliceLister
247
        endpointSlicesSynced          cache.InformerSynced
248
        epsIndexer                    cache.Indexer
249
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
250
        epKeyMutex                    keymutex.KeyMutex
251

252
        deploymentsLister appsv1.DeploymentLister
253
        deploymentsSynced cache.InformerSynced
254

255
        npsLister     netv1.NetworkPolicyLister
256
        npsSynced     cache.InformerSynced
257
        npIndexer     cache.Indexer
258
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
259
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
260
        npKeyMutex    keymutex.KeyMutex
261

262
        sgsLister          kubeovnlister.SecurityGroupLister
263
        sgSynced           cache.InformerSynced
264
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
265
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
266
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
267
        sgKeyMutex         keymutex.KeyMutex
268

269
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
270
        qosPolicySynced      cache.InformerSynced
271
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
272
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
273
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
274

275
        configMapsLister v1.ConfigMapLister
276
        configMapsSynced cache.InformerSynced
277

278
        anpsLister     anplister.AdminNetworkPolicyLister
279
        anpsSynced     cache.InformerSynced
280
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
281
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
282
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
283
        anpKeyMutex    keymutex.KeyMutex
284

285
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
286
        dnsNameResolversSynced          cache.InformerSynced
287
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
288
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
289

290
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
291
        banpsSynced     cache.InformerSynced
292
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
293
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
294
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
295
        banpKeyMutex    keymutex.KeyMutex
296

297
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
298
        cnpsSynced     cache.InformerSynced
299
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
300
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
301
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
302
        cnpKeyMutex    keymutex.KeyMutex
303

304
        csrLister           certListerv1.CertificateSigningRequestLister
305
        csrSynced           cache.InformerSynced
306
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
307

308
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
309
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
310
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
311

312
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
313
        netAttachSynced          cache.InformerSynced
314
        netAttachInformerFactory netAttach.SharedInformerFactory
315

316
        recorder               record.EventRecorder
317
        informerFactory        kubeinformers.SharedInformerFactory
318
        cmInformerFactory      kubeinformers.SharedInformerFactory
319
        deployInformerFactory  kubeinformers.SharedInformerFactory
320
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
321
        anpInformerFactory     anpinformer.SharedInformerFactory
322

323
        // Database health check
324
        dbFailureCount int
325

326
        distributedSubnetNeedSync atomic.Bool
327
}
328

329
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
330
        if rateLimiter == nil {
2✔
331
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
332
        }
1✔
333
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
334
}
335

336
// Run creates and runs a new ovn controller
337
func Run(ctx context.Context, config *Configuration) {
×
338
        klog.V(4).Info("Creating event broadcaster")
×
339
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
340
        eventBroadcaster.StartLogging(klog.Infof)
×
341
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
342
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
343
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
344
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
345
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
346
        )
×
347

×
348
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
349
        if err != nil {
×
350
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
351
        }
×
352

353
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
354
                kubeinformers.WithTransform(util.TrimManagedFields),
×
355
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
356
                        listOption.AllowWatchBookmarks = true
×
357
                }))
×
358
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
359
                kubeinformers.WithNamespace(config.PodNamespace),
×
360
                kubeinformers.WithTransform(util.TrimManagedFields),
×
361
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
362
                        listOption.AllowWatchBookmarks = true
×
363
                }))
×
364
        // deployment informer used to list/watch vpc egress gateway workloads
365
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
366
                kubeinformers.WithTransform(util.TrimManagedFields),
×
367
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
368
                        listOption.AllowWatchBookmarks = true
×
369
                        listOption.LabelSelector = selector.String()
×
370
                }))
×
371
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
372
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
373
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
374
                        listOption.AllowWatchBookmarks = true
×
375
                }))
×
376
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
377
                anpinformer.WithTransform(util.TrimManagedFields),
×
378
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
379
                        listOption.AllowWatchBookmarks = true
×
380
                }))
×
381
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
382
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
383
                        listOption.AllowWatchBookmarks = true
×
384
                }),
×
385
        )
386
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
387
                informer.WithTransform(util.TrimManagedFields),
×
388
        )
×
389

×
390
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
391
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
392
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
393
        bgpConfInformer := kubeovnInformerFactory.Kubeovn().V1().BgpConves()
×
394
        evpnConfInformer := kubeovnInformerFactory.Kubeovn().V1().EvpnConves()
×
395
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
396
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
397
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
398
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
399
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
400
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
401
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
402
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
403
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
404
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
405
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
406
        podInformer := informerFactory.Core().V1().Pods()
×
407
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
408
        nodeInformer := informerFactory.Core().V1().Nodes()
×
409
        serviceInformer := informerFactory.Core().V1().Services()
×
410
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
411
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
412
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
413
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
414
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
415
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
416
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
417
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
418
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
419
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
420
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
421
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
422
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
423
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
424
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
425
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
426
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
427

×
428
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
429
        controller := &Controller{
×
430
                config:             config,
×
431
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
432
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
433
                ipam:               ovnipam.NewIPAM(),
×
434
                namedPort:          NewNamedPort(),
×
435

×
436
                vpcsLister:           vpcInformer.Lister(),
×
437
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
438
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
439
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
440
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
441
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
442
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
443

×
444
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
445
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
446
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
447
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
448
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
449
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
450
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
451
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
452
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
453
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
454
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
455
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
456
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
457
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
458
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
459
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
460
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
461

×
462
                bgpConfLister:  bgpConfInformer.Lister(),
×
463
                bgpConfSynced:  bgpConfInformer.Informer().HasSynced,
×
464
                evpnConfLister: evpnConfInformer.Lister(),
×
465
                evpnConfSynced: evpnConfInformer.Informer().HasSynced,
×
466

×
467
                subnetsLister:           subnetInformer.Lister(),
×
468
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
469
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
470
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
471
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
472
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
473
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
474

×
475
                ippoolLister:            ippoolInformer.Lister(),
×
476
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
477
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
478
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
479
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
480
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
481

×
482
                ipsLister:     ipInformer.Lister(),
×
483
                ipSynced:      ipInformer.Informer().HasSynced,
×
484
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
485
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
486
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
487

×
488
                virtualIpsLister:          virtualIPInformer.Lister(),
×
489
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
490
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
491
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
492
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
493
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
494

×
495
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
496
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
497
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
498
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
499
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
500
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
501

×
502
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
503
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
504
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
505
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
506
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
507

×
508
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
509
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
510
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
511
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
512
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
513

×
514
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
515
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
516
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
517
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
518
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
519

×
520
                vlansLister:     vlanInformer.Lister(),
×
521
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
522
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
523
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
524
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
525
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
526

×
527
                providerNetworksLister: providerNetworkInformer.Lister(),
×
528
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
529

×
530
                podsLister:          podInformer.Lister(),
×
531
                podsSynced:          podInformer.Informer().HasSynced,
×
532
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
533
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
534
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
535
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
536
                                Name:          "DeletePod",
×
537
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
538
                        },
×
539
                ),
×
540
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
541
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
542

×
543
                namespacesLister:  namespaceInformer.Lister(),
×
544
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
545
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
546
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
547

×
548
                nodesLister:     nodeInformer.Lister(),
×
549
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
550
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
551
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
552
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
553
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
554

×
555
                servicesLister:     serviceInformer.Lister(),
×
556
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
557
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
558
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
559
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
560
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
561

×
562
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
563
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
564
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
565
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
566

×
567
                deploymentsLister: deploymentInformer.Lister(),
×
568
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
569

×
570
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
571
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
572
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
573
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
574
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
575

×
576
                configMapsLister: configMapInformer.Lister(),
×
577
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
578

×
579
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
580
                sgsLister:          sgInformer.Lister(),
×
581
                sgSynced:           sgInformer.Informer().HasSynced,
×
582
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
583
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
584
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
585

×
586
                ovnEipsLister:     ovnEipInformer.Lister(),
×
587
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
588
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
589
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
590
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
591
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
592

×
593
                ovnFipsLister:     ovnFipInformer.Lister(),
×
594
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
595
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
596
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
597
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
598

×
599
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
600
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
601
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
602
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
603
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
604

×
605
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
606
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
607
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
608
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
609
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
610

×
611
                csrLister:           csrInformer.Lister(),
×
612
                csrSynced:           csrInformer.Informer().HasSynced,
×
613
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
614

×
615
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
616
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
617
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
618

×
619
                netAttachLister:          netAttachInformer.Lister(),
×
620
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
621
                netAttachInformerFactory: attachNetInformerFactory,
×
622

×
623
                recorder:               recorder,
×
624
                informerFactory:        informerFactory,
×
625
                cmInformerFactory:      cmInformerFactory,
×
626
                deployInformerFactory:  deployInformerFactory,
×
627
                kubeovnInformerFactory: kubeovnInformerFactory,
×
628
                anpInformerFactory:     anpInformerFactory,
×
629
        }
×
630

×
631
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
632
                config.OvnNbAddr,
×
633
                config.OvnTimeout,
×
634
                config.OvsDbConnectTimeout,
×
635
                config.OvsDbInactivityTimeout,
×
636
                config.OvsDbConnectMaxRetry,
×
637
        ); err != nil {
×
638
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
639
        }
×
640
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
641
                config.OvnSbAddr,
×
642
                config.OvnTimeout,
×
643
                config.OvsDbConnectTimeout,
×
644
                config.OvsDbInactivityTimeout,
×
645
                config.OvsDbConnectMaxRetry,
×
646
        ); err != nil {
×
647
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
648
        }
×
649
        if config.EnableLb {
×
650
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
651
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
652
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
653
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
654
                        "DeleteSwitchLBRule",
×
655
                        workqueue.NewTypedMaxOfRateLimiter(
×
656
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
657
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
658
                        ),
×
659
                )
×
660
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
661
                        "UpdateSwitchLBRule",
×
662
                        workqueue.NewTypedMaxOfRateLimiter(
×
663
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
664
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
665
                        ),
×
666
                )
×
667

×
668
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
669
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
670
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
671
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
672
        }
×
673

674
        if config.EnableNP {
×
675
                controller.npsLister = npInformer.Lister()
×
676
                controller.npsSynced = npInformer.Informer().HasSynced
×
NEW
677
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
678
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
679
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
680
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
681
        }
×
682

683
        if config.EnableANP {
×
684
                controller.anpsLister = anpInformer.Lister()
×
685
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
686
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
687
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
688
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
689
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
690

×
691
                controller.banpsLister = banpInformer.Lister()
×
692
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
693
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
694
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
695
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
696
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
697

×
698
                controller.cnpsLister = cnpInformer.Lister()
×
699
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
700
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
701
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
702
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
703
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
704
        }
×
705

706
        if config.EnableDNSNameResolver {
×
707
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
708
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
709
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
710
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
711
        }
×
712

NEW
713
        if err := controller.setupIndexers(podInformer.Informer(), endpointSliceInformer.Informer()); err != nil {
×
NEW
714
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
NEW
715
        }
×
716

717
        defer controller.shutdown()
×
718
        klog.Info("Starting OVN controller")
×
719

×
720
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
721
        // NAD CRD is optional, so we check if it exists before starting the informer
×
722
        controller.StartNetAttachInformerFactory(ctx)
×
723

×
724
        // Wait for the caches to be synced before starting workers
×
725
        controller.informerFactory.Start(ctx.Done())
×
726
        controller.cmInformerFactory.Start(ctx.Done())
×
727
        controller.deployInformerFactory.Start(ctx.Done())
×
728
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
729
        controller.anpInformerFactory.Start(ctx.Done())
×
730
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
731

×
732
        klog.Info("Waiting for informer caches to sync")
×
733
        cacheSyncs := []cache.InformerSynced{
×
734
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
735
                controller.bgpConfSynced, controller.evpnConfSynced,
×
736
                controller.vpcSynced, controller.subnetSynced,
×
737
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
738
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
739
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
740
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
741
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
742
                controller.ovnDnatRuleSynced,
×
743
        }
×
744
        if controller.config.EnableLb {
×
745
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
746
        }
×
747
        if controller.config.EnableNP {
×
748
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
749
        }
×
750
        if controller.config.EnableANP {
×
751
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
752
        }
×
753
        if controller.config.EnableDNSNameResolver {
×
754
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
755
        }
×
756

757
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
758
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
759
        }
×
760

761
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
762
                AddFunc:    controller.enqueueAddPod,
×
763
                DeleteFunc: controller.enqueueDeletePod,
×
764
                UpdateFunc: controller.enqueueUpdatePod,
×
765
        }); err != nil {
×
766
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
767
        }
×
768

769
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
770
                AddFunc:    controller.enqueueAddNamespace,
×
771
                UpdateFunc: controller.enqueueUpdateNamespace,
×
772
                DeleteFunc: controller.enqueueDeleteNamespace,
×
773
        }); err != nil {
×
774
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
775
        }
×
776

777
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
778
                AddFunc:    controller.enqueueAddNode,
×
779
                UpdateFunc: controller.enqueueUpdateNode,
×
780
                DeleteFunc: controller.enqueueDeleteNode,
×
781
        }); err != nil {
×
782
                util.LogFatalAndExit(err, "failed to add node event handler")
×
783
        }
×
784

785
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
786
                AddFunc:    controller.enqueueAddService,
×
787
                DeleteFunc: controller.enqueueDeleteService,
×
788
                UpdateFunc: controller.enqueueUpdateService,
×
789
        }); err != nil {
×
790
                util.LogFatalAndExit(err, "failed to add service event handler")
×
791
        }
×
792

793
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
794
                AddFunc:    controller.enqueueAddEndpointSlice,
×
795
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
796
        }); err != nil {
×
797
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
798
        }
×
799

800
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
801
                AddFunc:    controller.enqueueAddDeployment,
×
802
                UpdateFunc: controller.enqueueUpdateDeployment,
×
803
        }); err != nil {
×
804
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
805
        }
×
806

807
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
808
                AddFunc:    controller.enqueueAddVpc,
×
809
                UpdateFunc: controller.enqueueUpdateVpc,
×
810
                DeleteFunc: controller.enqueueDelVpc,
×
811
        }); err != nil {
×
812
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
813
        }
×
814

815
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
816
                AddFunc:    controller.enqueueAddVpcNatGw,
×
817
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
818
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
819
        }); err != nil {
×
820
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
821
        }
×
822

823
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
824
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
825
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
826
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
827
        }); err != nil {
×
828
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
829
        }
×
830

831
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
832
                AddFunc:    controller.enqueueAddSubnet,
×
833
                UpdateFunc: controller.enqueueUpdateSubnet,
×
834
                DeleteFunc: controller.enqueueDeleteSubnet,
×
835
        }); err != nil {
×
836
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
837
        }
×
838

839
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
840
                AddFunc:    controller.enqueueAddIPPool,
×
841
                UpdateFunc: controller.enqueueUpdateIPPool,
×
842
                DeleteFunc: controller.enqueueDeleteIPPool,
×
843
        }); err != nil {
×
844
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
845
        }
×
846

847
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
848
                AddFunc:    controller.enqueueAddIP,
×
849
                UpdateFunc: controller.enqueueUpdateIP,
×
850
                DeleteFunc: controller.enqueueDelIP,
×
851
        }); err != nil {
×
852
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
853
        }
×
854

855
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
856
                AddFunc:    controller.enqueueAddVlan,
×
857
                DeleteFunc: controller.enqueueDelVlan,
×
858
                UpdateFunc: controller.enqueueUpdateVlan,
×
859
        }); err != nil {
×
860
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
861
        }
×
862

863
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
864
                AddFunc:    controller.enqueueAddSg,
×
865
                DeleteFunc: controller.enqueueDeleteSg,
×
866
                UpdateFunc: controller.enqueueUpdateSg,
×
867
        }); err != nil {
×
868
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
869
        }
×
870

871
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
872
                AddFunc:    controller.enqueueAddVirtualIP,
×
873
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
874
                DeleteFunc: controller.enqueueDelVirtualIP,
×
875
        }); err != nil {
×
876
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
877
        }
×
878

879
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
880
                AddFunc:    controller.enqueueAddIptablesEip,
×
881
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
882
                DeleteFunc: controller.enqueueDelIptablesEip,
×
883
        }); err != nil {
×
884
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
885
        }
×
886

887
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
888
                AddFunc:    controller.enqueueAddIptablesFip,
×
889
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
890
                DeleteFunc: controller.enqueueDelIptablesFip,
×
891
        }); err != nil {
×
892
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
893
        }
×
894

895
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
896
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
897
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
898
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
899
        }); err != nil {
×
900
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
901
        }
×
902

903
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
904
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
905
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
906
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
907
        }); err != nil {
×
908
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
909
        }
×
910

911
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
912
                AddFunc:    controller.enqueueAddOvnEip,
×
913
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
914
                DeleteFunc: controller.enqueueDelOvnEip,
×
915
        }); err != nil {
×
916
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
917
        }
×
918

919
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
920
                AddFunc:    controller.enqueueAddOvnFip,
×
921
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
922
                DeleteFunc: controller.enqueueDelOvnFip,
×
923
        }); err != nil {
×
924
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
925
        }
×
926

927
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
928
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
929
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
930
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
931
        }); err != nil {
×
932
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
933
        }
×
934

935
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
936
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
937
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
938
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
939
        }); err != nil {
×
940
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
941
        }
×
942

943
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
944
                AddFunc:    controller.enqueueAddQoSPolicy,
×
945
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
946
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
947
        }); err != nil {
×
948
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
949
        }
×
950

951
        if config.EnableLb {
×
952
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
953
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
954
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
955
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
956
                }); err != nil {
×
957
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
958
                }
×
959

960
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
961
                        AddFunc:    controller.enqueueAddVpcDNS,
×
962
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
963
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
964
                }); err != nil {
×
965
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
966
                }
×
967
        }
968

969
        if config.EnableNP {
×
970
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
971
                        AddFunc:    controller.enqueueAddNp,
×
972
                        UpdateFunc: controller.enqueueUpdateNp,
×
973
                        DeleteFunc: controller.enqueueDeleteNp,
×
974
                }); err != nil {
×
975
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
976
                }
×
977
        }
978

979
        if config.EnableANP {
×
980
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
981
                        AddFunc:    controller.enqueueAddAnp,
×
982
                        UpdateFunc: controller.enqueueUpdateAnp,
×
983
                        DeleteFunc: controller.enqueueDeleteAnp,
×
984
                }); err != nil {
×
985
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
986
                }
×
987

988
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
989
                        AddFunc:    controller.enqueueAddBanp,
×
990
                        UpdateFunc: controller.enqueueUpdateBanp,
×
991
                        DeleteFunc: controller.enqueueDeleteBanp,
×
992
                }); err != nil {
×
993
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
994
                }
×
995

996
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
997
                        AddFunc:    controller.enqueueAddCnp,
×
998
                        UpdateFunc: controller.enqueueUpdateCnp,
×
999
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1000
                }); err != nil {
×
1001
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1002
                }
×
1003

1004
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1005
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1006
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1007
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1008
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1009
        }
1010

1011
        if config.EnableDNSNameResolver {
×
1012
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1013
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1014
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1015
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1016
                }); err != nil {
×
1017
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1018
                }
×
1019
        }
1020

1021
        if config.EnableOVNIPSec {
×
1022
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1023
                        AddFunc:    controller.enqueueAddCsr,
×
1024
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1025
                        // no need to add delete func for csr
×
1026
                }); err != nil {
×
1027
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1028
                }
×
1029
        }
1030

1031
        controller.Run(ctx)
×
1032
}
1033

1034
// Run will set up the event handlers for types we are interested in, as well
1035
// as syncing informer caches and starting workers. It will block until stopCh
1036
// is closed, at which point it will shutdown the workqueue and wait for
1037
// workers to finish processing their current work items.
1038
func (c *Controller) Run(ctx context.Context) {
×
1039
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1040
        // Otherwise, the init process should be placed after all workers have already started working
×
1041
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1042
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1043
        }
×
1044

1045
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1046
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1047
        }
×
1048

1049
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1050
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1051
        }
×
1052

1053
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1054
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1055
        }
×
1056

1057
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1058
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1059
        }
×
1060

1061
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1062
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1063
        }
×
1064

1065
        if err := c.InitOVN(); err != nil {
×
1066
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1067
        }
×
1068

1069
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1070
        if err := c.syncIPCR(); err != nil {
×
1071
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1072
        }
×
1073

1074
        if err := c.syncFinalizers(); err != nil {
×
1075
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1076
        }
×
1077

1078
        if err := c.InitIPAM(); err != nil {
×
1079
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1080
        }
×
1081

1082
        if err := c.syncNodeRoutes(); err != nil {
×
1083
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1084
        }
×
1085

1086
        if err := c.syncSubnetCR(); err != nil {
×
1087
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1088
        }
×
1089

1090
        if err := c.syncVlanCR(); err != nil {
×
1091
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1092
        }
×
1093

1094
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1095
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1096
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1097
                }
×
1098
        }
1099

1100
        // start workers to do all the network operations
1101
        c.startWorkers(ctx)
×
1102

×
1103
        c.initResourceOnce()
×
1104
        <-ctx.Done()
×
1105
        klog.Info("Shutting down workers")
×
1106

×
1107
        c.OVNNbClient.Close()
×
1108
        c.OVNSbClient.Close()
×
1109
}
1110

1111
func (c *Controller) dbStatus() {
×
1112
        const maxFailures = 5
×
1113

×
1114
        done := make(chan error, 2)
×
1115
        go func() {
×
1116
                done <- c.OVNNbClient.Echo(context.Background())
×
1117
        }()
×
1118
        go func() {
×
1119
                done <- c.OVNSbClient.Echo(context.Background())
×
1120
        }()
×
1121

1122
        resultsReceived := 0
×
1123
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1124

×
1125
        for resultsReceived < 2 {
×
1126
                select {
×
1127
                case err := <-done:
×
1128
                        resultsReceived++
×
1129
                        if err != nil {
×
1130
                                c.dbFailureCount++
×
1131
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1132
                                if c.dbFailureCount >= maxFailures {
×
1133
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1134
                                }
×
1135
                                return
×
1136
                        }
1137
                case <-timeout:
×
1138
                        c.dbFailureCount++
×
1139
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1140
                        if c.dbFailureCount >= maxFailures {
×
1141
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1142
                        }
×
1143
                        return
×
1144
                }
1145
        }
1146

1147
        if c.dbFailureCount > 0 {
×
1148
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1149
                c.dbFailureCount = 0
×
1150
        }
×
1151
}
1152

1153
func (c *Controller) shutdown() {
×
1154
        utilruntime.HandleCrash()
×
1155

×
1156
        c.addOrUpdatePodQueue.ShutDown()
×
1157
        c.deletePodQueue.ShutDown()
×
1158
        c.updatePodSecurityQueue.ShutDown()
×
1159

×
1160
        c.addNamespaceQueue.ShutDown()
×
1161

×
1162
        c.addOrUpdateSubnetQueue.ShutDown()
×
1163
        c.deleteSubnetQueue.ShutDown()
×
1164
        c.updateSubnetStatusQueue.ShutDown()
×
1165
        c.syncVirtualPortsQueue.ShutDown()
×
1166

×
1167
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1168
        c.updateIPPoolStatusQueue.ShutDown()
×
1169
        c.deleteIPPoolQueue.ShutDown()
×
1170

×
1171
        c.addNodeQueue.ShutDown()
×
1172
        c.updateNodeQueue.ShutDown()
×
1173
        c.deleteNodeQueue.ShutDown()
×
1174

×
1175
        c.addServiceQueue.ShutDown()
×
1176
        c.deleteServiceQueue.ShutDown()
×
1177
        c.updateServiceQueue.ShutDown()
×
1178
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1179

×
1180
        c.addVlanQueue.ShutDown()
×
1181
        c.delVlanQueue.ShutDown()
×
1182
        c.updateVlanQueue.ShutDown()
×
1183

×
1184
        c.addOrUpdateVpcQueue.ShutDown()
×
1185
        c.updateVpcStatusQueue.ShutDown()
×
1186
        c.delVpcQueue.ShutDown()
×
1187

×
1188
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1189
        c.initVpcNatGatewayQueue.ShutDown()
×
1190
        c.delVpcNatGatewayQueue.ShutDown()
×
1191
        c.updateVpcEipQueue.ShutDown()
×
1192
        c.updateVpcFloatingIPQueue.ShutDown()
×
1193
        c.updateVpcDnatQueue.ShutDown()
×
1194
        c.updateVpcSnatQueue.ShutDown()
×
1195
        c.updateVpcSubnetQueue.ShutDown()
×
1196

×
1197
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1198
        c.delVpcEgressGatewayQueue.ShutDown()
×
1199

×
1200
        if c.config.EnableLb {
×
1201
                c.addSwitchLBRuleQueue.ShutDown()
×
1202
                c.delSwitchLBRuleQueue.ShutDown()
×
1203
                c.updateSwitchLBRuleQueue.ShutDown()
×
1204

×
1205
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1206
                c.delVpcDNSQueue.ShutDown()
×
1207
        }
×
1208

1209
        c.addIPQueue.ShutDown()
×
1210
        c.updateIPQueue.ShutDown()
×
1211
        c.delIPQueue.ShutDown()
×
1212

×
1213
        c.addVirtualIPQueue.ShutDown()
×
1214
        c.updateVirtualIPQueue.ShutDown()
×
1215
        c.updateVirtualParentsQueue.ShutDown()
×
1216
        c.delVirtualIPQueue.ShutDown()
×
1217

×
1218
        c.addIptablesEipQueue.ShutDown()
×
1219
        c.updateIptablesEipQueue.ShutDown()
×
1220
        c.resetIptablesEipQueue.ShutDown()
×
1221
        c.delIptablesEipQueue.ShutDown()
×
1222

×
1223
        c.addIptablesFipQueue.ShutDown()
×
1224
        c.updateIptablesFipQueue.ShutDown()
×
1225
        c.delIptablesFipQueue.ShutDown()
×
1226

×
1227
        c.addIptablesDnatRuleQueue.ShutDown()
×
1228
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1229
        c.delIptablesDnatRuleQueue.ShutDown()
×
1230

×
1231
        c.addIptablesSnatRuleQueue.ShutDown()
×
1232
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1233
        c.delIptablesSnatRuleQueue.ShutDown()
×
1234

×
1235
        c.addQoSPolicyQueue.ShutDown()
×
1236
        c.updateQoSPolicyQueue.ShutDown()
×
1237
        c.delQoSPolicyQueue.ShutDown()
×
1238

×
1239
        c.addOvnEipQueue.ShutDown()
×
1240
        c.updateOvnEipQueue.ShutDown()
×
1241
        c.resetOvnEipQueue.ShutDown()
×
1242
        c.delOvnEipQueue.ShutDown()
×
1243

×
1244
        c.addOvnFipQueue.ShutDown()
×
1245
        c.updateOvnFipQueue.ShutDown()
×
1246
        c.delOvnFipQueue.ShutDown()
×
1247

×
1248
        c.addOvnSnatRuleQueue.ShutDown()
×
1249
        c.updateOvnSnatRuleQueue.ShutDown()
×
1250
        c.delOvnSnatRuleQueue.ShutDown()
×
1251

×
1252
        c.addOvnDnatRuleQueue.ShutDown()
×
1253
        c.updateOvnDnatRuleQueue.ShutDown()
×
1254
        c.delOvnDnatRuleQueue.ShutDown()
×
1255

×
1256
        if c.config.EnableNP {
×
1257
                c.updateNpQueue.ShutDown()
×
1258
                c.deleteNpQueue.ShutDown()
×
1259
        }
×
1260
        if c.config.EnableANP {
×
1261
                c.addAnpQueue.ShutDown()
×
1262
                c.updateAnpQueue.ShutDown()
×
1263
                c.deleteAnpQueue.ShutDown()
×
1264

×
1265
                c.addBanpQueue.ShutDown()
×
1266
                c.updateBanpQueue.ShutDown()
×
1267
                c.deleteBanpQueue.ShutDown()
×
1268

×
1269
                c.addCnpQueue.ShutDown()
×
1270
                c.updateCnpQueue.ShutDown()
×
1271
                c.deleteCnpQueue.ShutDown()
×
1272
        }
×
1273

1274
        if c.config.EnableDNSNameResolver {
×
1275
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1276
                c.deleteDNSNameResolverQueue.ShutDown()
×
1277
        }
×
1278

1279
        c.addOrUpdateSgQueue.ShutDown()
×
1280
        c.delSgQueue.ShutDown()
×
1281
        c.syncSgPortsQueue.ShutDown()
×
1282

×
1283
        c.addOrUpdateCsrQueue.ShutDown()
×
1284

×
1285
        if c.config.EnableLiveMigrationOptimize {
×
1286
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1287
        }
×
1288
}
1289

1290
func (c *Controller) startWorkers(ctx context.Context) {
×
1291
        klog.Info("Starting workers")
×
1292

×
1293
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1294
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1295
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1296

×
1297
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1298
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1299
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1300
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1301
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1302
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1303
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1304
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1305
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1306
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1307
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1308
        // add default and join subnet and wait them ready
×
1309
        for range c.config.WorkerNum {
×
1310
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1311
        }
×
1312
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1313
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1314
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1315
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1316
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1317
                klog.Infof("wait for subnets %v ready", subnets)
×
1318

×
1319
                return c.allSubnetReady(subnets...)
×
1320
        })
×
1321
        if err != nil {
×
1322
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1323
        }
×
1324

1325
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1326
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1327
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1328

×
1329
        // run node worker before handle any pods
×
1330
        for range c.config.WorkerNum {
×
1331
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1332
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1333
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1334
        }
×
1335
        for {
×
1336
                ready := true
×
1337
                time.Sleep(3 * time.Second)
×
1338
                nodes, err := c.nodesLister.List(labels.Everything())
×
1339
                if err != nil {
×
1340
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1341
                }
×
1342
                for _, node := range nodes {
×
1343
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1344
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1345
                                ready = false
×
1346
                                break
×
1347
                        }
1348
                }
1349
                if ready {
×
1350
                        break
×
1351
                }
1352
        }
1353

1354
        if c.config.EnableLb {
×
1355
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1356
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1357
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1358

×
1359
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1360
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1361
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1362

×
1363
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1364
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1365
                go wait.Until(func() {
×
1366
                        c.resyncVpcDNSConfig()
×
1367
                }, 5*time.Second, ctx.Done())
×
1368
        }
1369

1370
        for range c.config.WorkerNum {
×
1371
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1372
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1373
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1374

×
1375
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1376
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1377
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1378
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1379
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1380

×
1381
                if c.config.EnableLb {
×
1382
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1383
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1384
                }
×
1385

1386
                if c.config.EnableNP {
×
1387
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1388
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1389
                }
×
1390

1391
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1392
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1393
        }
1394

1395
        if c.config.EnableEipSnat {
×
1396
                go wait.Until(func() {
×
1397
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1398
                        c.resyncExternalGateway()
×
1399
                }, time.Second, ctx.Done())
×
1400

1401
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1402
                c.OVNNbClient.MonitorBFD()
×
1403
        }
1404
        // TODO: we should merge these two vpc nat config into one config and resync them together
1405
        go wait.Until(func() {
×
1406
                c.resyncVpcNatGwConfig()
×
1407
        }, time.Second, ctx.Done())
×
1408

1409
        go wait.Until(func() {
×
1410
                c.resyncVpcNatConfig()
×
1411
        }, time.Second, ctx.Done())
×
1412

1413
        if c.config.GCInterval != 0 {
×
1414
                go wait.Until(func() {
×
1415
                        if err := c.markAndCleanLSP(); err != nil {
×
1416
                                klog.Errorf("gc lsp error: %v", err)
×
1417
                        }
×
1418
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1419
        }
1420

1421
        go wait.Until(func() {
×
1422
                if err := c.inspectPod(); err != nil {
×
1423
                        klog.Errorf("inspection error: %v", err)
×
1424
                }
×
1425
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1426

1427
        if c.config.EnableExternalVpc {
×
1428
                go wait.Until(func() {
×
1429
                        c.syncExternalVpc()
×
1430
                }, 5*time.Second, ctx.Done())
×
1431
        }
1432

1433
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1434
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1435
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1436
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1437

×
1438
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1439
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1441
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1442

×
1443
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1444
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1445
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1446

×
1447
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1448
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1449
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1450

×
1451
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1452
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1453
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1454

×
1455
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1456

×
1457
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1458
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1459
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1460

×
1461
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1462
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1463
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1464
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1465

×
1466
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1467
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1468
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1469
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1470

×
1471
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1472
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1473
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1474

×
1475
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1476
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1477
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1478

×
1479
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1480
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1481
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1482

×
1483
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1484
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1485
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1486

×
1487
        if c.config.EnableANP {
×
1488
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1489
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1490
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1491

×
1492
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1493
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1494
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1495

×
1496
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1497
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1498
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1499
        }
×
1500

1501
        if c.config.EnableDNSNameResolver {
×
1502
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1503
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1504
        }
×
1505

1506
        if c.config.EnableLiveMigrationOptimize {
×
1507
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1508
        }
×
1509

1510
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1511

×
1512
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1513
}
1514

1515
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1516
        for _, lsName := range subnets {
2✔
1517
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1518
                if err != nil {
1✔
1519
                        klog.Error(err)
×
1520
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1521
                }
×
1522

1523
                if !exist {
2✔
1524
                        return false, nil
1✔
1525
                }
1✔
1526
        }
1527

1528
        return true, nil
1✔
1529
}
1530

1531
func (c *Controller) initResourceOnce() {
×
1532
        c.registerSubnetMetrics()
×
1533

×
1534
        if err := c.initNodeChassis(); err != nil {
×
1535
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1536
        }
×
1537

1538
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1539
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1540
        }
×
1541
        if err := c.syncSecurityGroup(); err != nil {
×
1542
                util.LogFatalAndExit(err, "failed to sync security group")
×
1543
        }
×
1544

1545
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1546
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1547
        }
×
1548

1549
        if err := c.initVpcNatGw(); err != nil {
×
1550
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1551
        }
×
1552
        if c.config.EnableLb {
×
1553
                if err := c.initVpcDNSConfig(); err != nil {
×
1554
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1555
                }
×
1556
        }
1557

1558
        // remove resources in ovndb that not exist any more in kubernetes resources
1559
        // process gc at last in case of affecting other init process
1560
        if err := c.gc(); err != nil {
×
1561
                util.LogFatalAndExit(err, "failed to run gc")
×
1562
        }
×
1563
}
1564

1565
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1566
        item, shutdown := queue.Get()
×
1567
        if shutdown {
×
1568
                return false
×
1569
        }
×
1570

1571
        err := func(item T) error {
×
1572
                defer queue.Done(item)
×
1573
                if err := handler(item); err != nil {
×
1574
                        queue.AddRateLimited(item)
×
1575
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1576
                }
×
1577
                queue.Forget(item)
×
1578
                return nil
×
1579
        }(item)
1580
        if err != nil {
×
1581
                utilruntime.HandleError(err)
×
1582
                return true
×
1583
        }
×
1584
        return true
×
1585
}
1586

1587
func getWorkItemKey(obj any) string {
×
1588
        switch v := obj.(type) {
×
1589
        case string:
×
1590
                return v
×
1591
        case *vpcService:
×
1592
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1593
        case *AdminNetworkPolicyChangedDelta:
×
1594
                return v.key
×
1595
        case *SwitchLBRuleInfo:
×
1596
                return v.Name
×
1597
        default:
×
1598
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1599
                if err != nil {
×
1600
                        utilruntime.HandleError(err)
×
1601
                        return ""
×
1602
                }
×
1603
                return key
×
1604
        }
1605
}
1606

1607
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1608
        return func() {
×
1609
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1610
                }
×
1611
        }
1612
}
1613

1614
// apiResourceExists checks if all specified kinds exist in the given group version.
1615
// It returns true if all kinds are found, false otherwise.
1616
// Parameters:
1617
// - discoveryClient: The discovery client to use for querying API resources.
1618
// - gv: The group version string (e.g., "apps/v1").
1619
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
1620
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
1621
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
1622
        if err != nil {
×
1623
                if k8serrors.IsNotFound(err) {
×
1624
                        return false, nil
×
1625
                }
×
1626
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1627
        }
1628

1629
        existingKinds := set.New[string]()
×
1630
        for _, apiResource := range apiResourceLists.APIResources {
×
1631
                existingKinds.Insert(apiResource.Kind)
×
1632
        }
×
1633

1634
        return existingKinds.HasAll(kinds...), nil
×
1635
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc