• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 25472384490

07 May 2026 02:20AM UTC coverage: 24.899% (+0.06%) from 24.835%
25472384490

push

github

web-flow
feat(servicecidr): support K8s multiple ServiceCIDR (KEP-1880) (#6690)

* feat(servicecidr): support Kubernetes multiple ServiceCIDR (KEP-1880)

Watch networking.k8s.io/v1 ServiceCIDR objects and merge them with the
--service-cluster-ip-range flag value into a single source of truth used
by every Service-CIDR consumer (U2O policy routes, vpc-lb init
containers, vpc-nat-gw routes, daemon ipset/iptables). The flag now
serves as a startup fallback that yields once the API observes any valid
entry, and re-engages if the API set ever empties — so old clusters
without the API behave exactly as before, while 1.33+ clusters converge
to the API-advertised set and pick up dynamic add/remove.

ServiceCIDR API discovery uses APIResourceExists with a 10s ticker
fallback (same shape as the NAD/KubeVirt scaffolds), so missing API on
older clusters is a no-op rather than an error. RBAC for
networking.k8s.io/servicecidrs is added to system:ovn and
system:kube-ovn-cni in install.sh and both Helm charts.

Notable behavior changes:
- vpc-lb deployment is now upserted with a ServiceCIDR hash annotation;
  changing the merged set rolls the pod via the existing Recreate
  strategy.
- U2O no-LB policy routes are pruned at the start of every reconcile
  (policy-only delete, port groups untouched) so shrinking the set no
  longer leaves stale OVN entries.
- Existing VPC NAT gateways are intentionally not re-enqueued on
  ServiceCIDR change; their routes only refresh when the pod is
  recreated by other means. Newly created NAT gateways pick up the
  current store via their own add path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* refactor(servicecidr): consolidate duplicates and use stdlib helpers

- Promote readyServiceCIDRs to util.ReadyServiceCIDRs so controller and
  daemon share one source of truth.
- Drop custom equalStringSlice in favour of slices.Equal.
- Extract vpcLbInitContainers helpe... (continued)

124 of 467 new or added lines in 12 files covered. (26.55%)

2 existing lines in 1 file now uncovered.

14198 of 57023 relevant lines covered (24.9%)

0.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.13
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/apimachinery/pkg/util/wait"
21
        kubeinformers "k8s.io/client-go/informers"
22
        "k8s.io/client-go/kubernetes/scheme"
23
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
24
        appsv1 "k8s.io/client-go/listers/apps/v1"
25
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
26
        v1 "k8s.io/client-go/listers/core/v1"
27
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
28
        netv1 "k8s.io/client-go/listers/networking/v1"
29
        "k8s.io/client-go/tools/cache"
30
        "k8s.io/client-go/tools/record"
31
        "k8s.io/client-go/util/workqueue"
32
        "k8s.io/klog/v2"
33
        "k8s.io/utils/keymutex"
34
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
35
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
36
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
37
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
38
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
39

40
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
41
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
42
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
43
        "github.com/kubeovn/kube-ovn/pkg/informer"
44
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
45
        "github.com/kubeovn/kube-ovn/pkg/ovs"
46
        "github.com/kubeovn/kube-ovn/pkg/util"
47
)
48

49
const controllerAgentName = "kube-ovn-controller"
50

51
const (
52
        logicalSwitchKey              = "ls"
53
        logicalRouterKey              = "lr"
54
        portGroupKey                  = "pg"
55
        networkPolicyKey              = "np"
56
        sgKey                         = "sg"
57
        sgsKey                        = "security_groups"
58
        u2oKey                        = "u2o"
59
        adminNetworkPolicyKey         = "anp"
60
        baselineAdminNetworkPolicyKey = "banp"
61
        ippoolKey                     = "ippool"
62
        clusterNetworkPolicyKey       = "cnp"
63
)
64

65
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
66
type Controller struct {
67
        config *Configuration
68

69
        ipam             *ovnipam.IPAM
70
        namedPort        *NamedPort
71
        anpPrioNameMap   map[int32]string
72
        anpNamePrioMap   map[string]int32
73
        bnpPrioNameMap   map[int32]string
74
        bnpNamePrioMap   map[string]int32
75
        priorityMapMutex sync.RWMutex
76

77
        OVNNbClient ovs.NbClient
78
        OVNSbClient ovs.SbClient
79

80
        // ExternalGatewayType define external gateway type, centralized
81
        ExternalGatewayType string
82

83
        podsLister             v1.PodLister
84
        podsSynced             cache.InformerSynced
85
        podIndexer             cache.Indexer
86
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
87
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
88
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
89
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
90
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
91
        podKeyMutex            keymutex.KeyMutex
92

93
        vpcsLister           kubeovnlister.VpcLister
94
        vpcSynced            cache.InformerSynced
95
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
96
        vpcLastPoliciesMap   *xsync.Map[string, string]
97
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
98
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
99
        vpcKeyMutex          keymutex.KeyMutex
100

101
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
102
        vpcNatGatewaySynced           cache.InformerSynced
103
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
104
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
105
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
106
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
107
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
108
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
109
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
110
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
111
        vpcNatGwKeyMutex              keymutex.KeyMutex
112
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
113

114
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
115
        vpcEgressGatewaySynced           cache.InformerSynced
116
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
117
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
118
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
119

120
        bgpConfLister  kubeovnlister.BgpConfLister
121
        bgpConfSynced  cache.InformerSynced
122
        evpnConfLister kubeovnlister.EvpnConfLister
123
        evpnConfSynced cache.InformerSynced
124

125
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
126
        switchLBRuleSynced      cache.InformerSynced
127
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
128
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
129
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SwitchLBRuleInfo]
130

131
        vpcDNSLister           kubeovnlister.VpcDnsLister
132
        vpcDNSSynced           cache.InformerSynced
133
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
134
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
135

136
        subnetsLister           kubeovnlister.SubnetLister
137
        subnetSynced            cache.InformerSynced
138
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
139
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
140
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
141
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
142
        subnetKeyMutex          keymutex.KeyMutex
143

144
        ippoolLister            kubeovnlister.IPPoolLister
145
        ippoolSynced            cache.InformerSynced
146
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
147
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
148
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
149
        ippoolKeyMutex          keymutex.KeyMutex
150

151
        ipsLister     kubeovnlister.IPLister
152
        ipSynced      cache.InformerSynced
153
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
154
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
155
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
156

157
        virtualIpsLister          kubeovnlister.VipLister
158
        virtualIpsSynced          cache.InformerSynced
159
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
160
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
161
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
162
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
163

164
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
165
        iptablesEipSynced      cache.InformerSynced
166
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
167
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
168
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
169
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
170

171
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
172
        iptablesFipSynced      cache.InformerSynced
173
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
174
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
175
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
176

177
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
178
        iptablesDnatRuleSynced      cache.InformerSynced
179
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
180
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
181
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
182

183
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
184
        iptablesSnatRuleSynced      cache.InformerSynced
185
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
186
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
187
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
188

189
        ovnEipsLister     kubeovnlister.OvnEipLister
190
        ovnEipSynced      cache.InformerSynced
191
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
192
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
193
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
194
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
195

196
        ovnFipsLister     kubeovnlister.OvnFipLister
197
        ovnFipSynced      cache.InformerSynced
198
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
199
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
200
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
201

202
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
203
        ovnSnatRuleSynced      cache.InformerSynced
204
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
205
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
206
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
207

208
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
209
        ovnDnatRuleSynced      cache.InformerSynced
210
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
211
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
212
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
213

214
        providerNetworksLister kubeovnlister.ProviderNetworkLister
215
        providerNetworkSynced  cache.InformerSynced
216

217
        vlansLister     kubeovnlister.VlanLister
218
        vlanSynced      cache.InformerSynced
219
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
220
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
221
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
222
        vlanKeyMutex    keymutex.KeyMutex
223

224
        namespacesLister  v1.NamespaceLister
225
        namespacesSynced  cache.InformerSynced
226
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
227
        nsKeyMutex        keymutex.KeyMutex
228

229
        nodesLister     v1.NodeLister
230
        nodesSynced     cache.InformerSynced
231
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
232
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
233
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
234
        nodeKeyMutex    keymutex.KeyMutex
235

236
        servicesLister     v1.ServiceLister
237
        serviceSynced      cache.InformerSynced
238
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
239
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
240
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
241
        svcKeyMutex        keymutex.KeyMutex
242

243
        endpointSlicesLister          discoveryv1.EndpointSliceLister
244
        endpointSlicesSynced          cache.InformerSynced
245
        epsIndexer                    cache.Indexer
246
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
247
        epKeyMutex                    keymutex.KeyMutex
248

249
        deploymentsLister appsv1.DeploymentLister
250
        deploymentsSynced cache.InformerSynced
251

252
        npsLister     netv1.NetworkPolicyLister
253
        npsSynced     cache.InformerSynced
254
        npIndexer     cache.Indexer
255
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
256
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
257
        npKeyMutex    keymutex.KeyMutex
258

259
        sgsLister          kubeovnlister.SecurityGroupLister
260
        sgSynced           cache.InformerSynced
261
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
262
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
263
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
264
        sgKeyMutex         keymutex.KeyMutex
265

266
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
267
        qosPolicySynced      cache.InformerSynced
268
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
269
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
270
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
271

272
        configMapsLister v1.ConfigMapLister
273
        configMapsSynced cache.InformerSynced
274

275
        anpsLister     anplister.AdminNetworkPolicyLister
276
        anpsSynced     cache.InformerSynced
277
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
278
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
279
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
280
        anpKeyMutex    keymutex.KeyMutex
281

282
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
283
        dnsNameResolversSynced          cache.InformerSynced
284
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
285
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
286

287
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
288
        banpsSynced     cache.InformerSynced
289
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
290
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
291
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
292
        banpKeyMutex    keymutex.KeyMutex
293

294
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
295
        cnpsSynced     cache.InformerSynced
296
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
297
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
298
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
299
        cnpKeyMutex    keymutex.KeyMutex
300

301
        csrLister           certListerv1.CertificateSigningRequestLister
302
        csrSynced           cache.InformerSynced
303
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
304

305
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
306
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
307
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
308

309
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
310
        netAttachSynced          cache.InformerSynced
311
        netAttachInformerFactory netAttach.SharedInformerFactory
312

313
        serviceCIDRStore           *util.ServiceCIDRStore
314
        serviceCIDRLister          netv1.ServiceCIDRLister
315
        serviceCIDRSynced          cache.InformerSynced
316
        serviceCIDRInformerFactory kubeinformers.SharedInformerFactory
317

318
        recorder               record.EventRecorder
319
        informerFactory        kubeinformers.SharedInformerFactory
320
        cmInformerFactory      kubeinformers.SharedInformerFactory
321
        deployInformerFactory  kubeinformers.SharedInformerFactory
322
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
323
        anpInformerFactory     anpinformer.SharedInformerFactory
324

325
        // Database health check
326
        dbFailureCount int
327

328
        distributedSubnetNeedSync atomic.Bool
329
}
330

331
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
1✔
332
        if rateLimiter == nil {
2✔
333
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
1✔
334
        }
1✔
335
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
1✔
336
}
337

338
// Run creates and runs a new ovn controller
339
func Run(ctx context.Context, config *Configuration) {
×
340
        klog.V(4).Info("Creating event broadcaster")
×
341
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
342
        eventBroadcaster.StartLogging(klog.Infof)
×
343
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events(metav1.NamespaceAll)})
×
344
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
345
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
346
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
347
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
348
        )
×
349

×
350
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
351
        if err != nil {
×
352
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
353
        }
×
354

355
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
356
                kubeinformers.WithTransform(util.TrimPodForController),
×
357
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
358
                        listOption.AllowWatchBookmarks = true
×
359
                }))
×
360
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
361
                kubeinformers.WithNamespace(config.PodNamespace),
×
362
                kubeinformers.WithTransform(util.TrimManagedFields),
×
363
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
364
                        listOption.AllowWatchBookmarks = true
×
365
                }))
×
366
        // deployment informer used to list/watch vpc egress gateway workloads
367
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
368
                kubeinformers.WithTransform(util.TrimManagedFields),
×
369
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
370
                        listOption.AllowWatchBookmarks = true
×
371
                        listOption.LabelSelector = selector.String()
×
372
                }))
×
373
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
374
                kubeovninformer.WithTransform(util.TrimManagedFields),
×
375
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
376
                        listOption.AllowWatchBookmarks = true
×
377
                }))
×
378
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
379
                anpinformer.WithTransform(util.TrimManagedFields),
×
380
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
381
                        listOption.AllowWatchBookmarks = true
×
382
                }))
×
383
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
384
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
385
                        listOption.AllowWatchBookmarks = true
×
386
                }),
×
387
        )
388
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactoryWithOptions(config.KubevirtClient.RestClient(), config.KubevirtClient,
×
389
                informer.WithTransform(util.TrimManagedFields),
×
390
        )
×
NEW
391
        // Dedicated factory so that on clusters without the ServiceCIDR API the
×
NEW
392
        // failed list/watch does not contaminate the main informer factory.
×
NEW
393
        serviceCIDRInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
×
NEW
394
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
NEW
395
                        listOption.AllowWatchBookmarks = true
×
NEW
396
                }),
×
397
        )
398

399
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
400
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
401
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
402
        bgpConfInformer := kubeovnInformerFactory.Kubeovn().V1().BgpConves()
×
403
        evpnConfInformer := kubeovnInformerFactory.Kubeovn().V1().EvpnConves()
×
404
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
405
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
406
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
407
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
408
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
409
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
410
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
411
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
412
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
413
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
414
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
415
        podInformer := informerFactory.Core().V1().Pods()
×
416
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
417
        nodeInformer := informerFactory.Core().V1().Nodes()
×
418
        serviceInformer := informerFactory.Core().V1().Services()
×
419
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
420
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
421
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
422
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
423
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
424
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
425
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
426
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
427
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
428
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
429
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
430
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
431
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
432
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
433
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
434
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
435
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
436

×
437
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
438
        controller := &Controller{
×
439
                config:             config,
×
440
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
441
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
442
                ipam:               ovnipam.NewIPAM(),
×
443
                namedPort:          NewNamedPort(),
×
444

×
445
                vpcsLister:           vpcInformer.Lister(),
×
446
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
447
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
448
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
449
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
450
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
451
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
452

×
453
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
454
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
455
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
456
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
457
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
458
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
459
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
460
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
461
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
462
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
463
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
464
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
465
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
466
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
467
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
468
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
469
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
470

×
471
                bgpConfLister:  bgpConfInformer.Lister(),
×
472
                bgpConfSynced:  bgpConfInformer.Informer().HasSynced,
×
473
                evpnConfLister: evpnConfInformer.Lister(),
×
474
                evpnConfSynced: evpnConfInformer.Informer().HasSynced,
×
475

×
476
                subnetsLister:           subnetInformer.Lister(),
×
477
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
478
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
479
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
480
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
481
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
482
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
483

×
484
                ippoolLister:            ippoolInformer.Lister(),
×
485
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
486
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
487
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
488
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
489
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
490

×
491
                ipsLister:     ipInformer.Lister(),
×
492
                ipSynced:      ipInformer.Informer().HasSynced,
×
493
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
494
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
495
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
496

×
497
                virtualIpsLister:          virtualIPInformer.Lister(),
×
498
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
499
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
500
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
501
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
502
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
503

×
504
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
505
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
506
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
507
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
508
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
509
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
510

×
511
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
512
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
513
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
514
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
515
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
516

×
517
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
518
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
519
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
520
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
521
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
522

×
523
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
524
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
525
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
526
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
527
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
528

×
529
                vlansLister:     vlanInformer.Lister(),
×
530
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
531
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
532
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
533
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
534
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
535

×
536
                providerNetworksLister: providerNetworkInformer.Lister(),
×
537
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
538

×
539
                podsLister:          podInformer.Lister(),
×
540
                podsSynced:          podInformer.Informer().HasSynced,
×
541
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
542
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
543
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
544
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
545
                                Name:          "DeletePod",
×
546
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
547
                        },
×
548
                ),
×
549
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
550
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
551

×
552
                namespacesLister:  namespaceInformer.Lister(),
×
553
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
554
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
555
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
556

×
557
                nodesLister:     nodeInformer.Lister(),
×
558
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
559
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
560
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
561
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
562
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
563

×
564
                servicesLister:     serviceInformer.Lister(),
×
565
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
566
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
567
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
568
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
569
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
570

×
571
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
572
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
573
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
574
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
575

×
576
                deploymentsLister: deploymentInformer.Lister(),
×
577
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
578

×
579
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
580
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
581
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
582
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
583
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
584

×
585
                configMapsLister: configMapInformer.Lister(),
×
586
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
587

×
588
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
589
                sgsLister:          sgInformer.Lister(),
×
590
                sgSynced:           sgInformer.Informer().HasSynced,
×
591
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
592
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
593
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
594

×
595
                ovnEipsLister:     ovnEipInformer.Lister(),
×
596
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
597
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
598
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
599
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
600
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
601

×
602
                ovnFipsLister:     ovnFipInformer.Lister(),
×
603
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
604
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
605
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
606
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
607

×
608
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
609
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
610
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
611
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
612
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
613

×
614
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
615
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
616
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
617
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
618
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
619

×
620
                csrLister:           csrInformer.Lister(),
×
621
                csrSynced:           csrInformer.Informer().HasSynced,
×
622
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
623

×
624
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
625
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
626
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
627

×
628
                netAttachLister:          netAttachInformer.Lister(),
×
629
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
630
                netAttachInformerFactory: attachNetInformerFactory,
×
631

×
NEW
632
                serviceCIDRStore:           util.NewServiceCIDRStore(config.ServiceClusterIPRange),
×
NEW
633
                serviceCIDRInformerFactory: serviceCIDRInformerFactory,
×
NEW
634

×
635
                recorder:               recorder,
×
636
                informerFactory:        informerFactory,
×
637
                cmInformerFactory:      cmInformerFactory,
×
638
                deployInformerFactory:  deployInformerFactory,
×
639
                kubeovnInformerFactory: kubeovnInformerFactory,
×
640
                anpInformerFactory:     anpInformerFactory,
×
641
        }
×
642

×
643
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
644
                config.OvnNbAddr,
×
645
                config.OvnTimeout,
×
646
                config.OvsDbConnectTimeout,
×
647
                config.OvsDbInactivityTimeout,
×
648
                config.OvsDbConnectMaxRetry,
×
649
        ); err != nil {
×
650
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
651
        }
×
652
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
653
                config.OvnSbAddr,
×
654
                config.OvnTimeout,
×
655
                config.OvsDbConnectTimeout,
×
656
                config.OvsDbInactivityTimeout,
×
657
                config.OvsDbConnectMaxRetry,
×
658
        ); err != nil {
×
659
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
660
        }
×
661
        if config.EnableLb {
×
662
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
663
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
664
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
665
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
666
                        "DeleteSwitchLBRule",
×
667
                        workqueue.NewTypedMaxOfRateLimiter(
×
668
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
669
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
670
                        ),
×
671
                )
×
672
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
673
                        "UpdateSwitchLBRule",
×
674
                        workqueue.NewTypedMaxOfRateLimiter(
×
675
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SwitchLBRuleInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
676
                                &workqueue.TypedBucketRateLimiter[*SwitchLBRuleInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
677
                        ),
×
678
                )
×
679

×
680
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
681
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
682
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
683
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
684
        }
×
685

686
        if config.EnableNP {
×
687
                controller.npsLister = npInformer.Lister()
×
688
                controller.npsSynced = npInformer.Informer().HasSynced
×
689
                controller.npIndexer = npInformer.Informer().GetIndexer()
×
690
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
691
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
692
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
693
        }
×
694

695
        if config.EnableANP {
×
696
                controller.anpsLister = anpInformer.Lister()
×
697
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
698
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
699
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
700
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
701
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
702

×
703
                controller.banpsLister = banpInformer.Lister()
×
704
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
705
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
706
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
707
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
708
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
709

×
710
                controller.cnpsLister = cnpInformer.Lister()
×
711
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
712
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
713
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
714
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
715
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
716
        }
×
717

718
        if config.EnableDNSNameResolver {
×
719
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
720
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
721
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
722
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
723
        }
×
724

725
        if err := controller.setupIndexers(podInformer.Informer(), endpointSliceInformer.Informer()); err != nil {
×
726
                util.LogFatalAndExit(err, "failed to set up informer indexers")
×
727
        }
×
728

729
        defer controller.shutdown()
×
730
        klog.Info("Starting OVN controller")
×
731

×
732
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
733
        // NAD CRD is optional, so we check if it exists before starting the informer
×
734
        controller.StartNetAttachInformerFactory(ctx)
×
735

×
NEW
736
        // ServiceCIDR (networking.k8s.io/v1) is GA in K8s 1.33; older clusters
×
NEW
737
        // don't have the API at all. Best-effort start with periodic retry.
×
NEW
738
        controller.StartServiceCIDRInformerFactory(ctx)
×
NEW
739

×
740
        // Wait for the caches to be synced before starting workers
×
741
        controller.informerFactory.Start(ctx.Done())
×
742
        controller.cmInformerFactory.Start(ctx.Done())
×
743
        controller.deployInformerFactory.Start(ctx.Done())
×
744
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
745
        controller.anpInformerFactory.Start(ctx.Done())
×
746
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
747

×
748
        klog.Info("Waiting for informer caches to sync")
×
749
        cacheSyncs := []cache.InformerSynced{
×
750
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
751
                controller.bgpConfSynced, controller.evpnConfSynced,
×
752
                controller.vpcSynced, controller.subnetSynced,
×
753
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
754
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
755
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
756
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
757
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
758
                controller.ovnDnatRuleSynced,
×
759
        }
×
760
        if controller.config.EnableLb {
×
761
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
762
        }
×
763
        if controller.config.EnableNP {
×
764
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
765
        }
×
766
        if controller.config.EnableANP {
×
767
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
768
        }
×
769
        if controller.config.EnableDNSNameResolver {
×
770
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
771
        }
×
772

773
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
774
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
775
        }
×
776

777
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
778
                AddFunc:    controller.enqueueAddPod,
×
779
                DeleteFunc: controller.enqueueDeletePod,
×
780
                UpdateFunc: controller.enqueueUpdatePod,
×
781
        }); err != nil {
×
782
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
783
        }
×
784

785
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
786
                AddFunc:    controller.enqueueAddNamespace,
×
787
                UpdateFunc: controller.enqueueUpdateNamespace,
×
788
                DeleteFunc: controller.enqueueDeleteNamespace,
×
789
        }); err != nil {
×
790
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
791
        }
×
792

793
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
794
                AddFunc:    controller.enqueueAddNode,
×
795
                UpdateFunc: controller.enqueueUpdateNode,
×
796
                DeleteFunc: controller.enqueueDeleteNode,
×
797
        }); err != nil {
×
798
                util.LogFatalAndExit(err, "failed to add node event handler")
×
799
        }
×
800

801
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
802
                AddFunc:    controller.enqueueAddService,
×
803
                DeleteFunc: controller.enqueueDeleteService,
×
804
                UpdateFunc: controller.enqueueUpdateService,
×
805
        }); err != nil {
×
806
                util.LogFatalAndExit(err, "failed to add service event handler")
×
807
        }
×
808

809
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
810
                AddFunc:    controller.enqueueAddEndpointSlice,
×
811
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
812
        }); err != nil {
×
813
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
814
        }
×
815

816
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
817
                AddFunc:    controller.enqueueAddDeployment,
×
818
                UpdateFunc: controller.enqueueUpdateDeployment,
×
819
        }); err != nil {
×
820
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
821
        }
×
822

823
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
824
                AddFunc:    controller.enqueueAddVpc,
×
825
                UpdateFunc: controller.enqueueUpdateVpc,
×
826
                DeleteFunc: controller.enqueueDelVpc,
×
827
        }); err != nil {
×
828
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
829
        }
×
830

831
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
832
                AddFunc:    controller.enqueueAddVpcNatGw,
×
833
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
834
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
835
        }); err != nil {
×
836
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
837
        }
×
838

839
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
840
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
841
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
842
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
843
        }); err != nil {
×
844
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
845
        }
×
846

847
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
848
                AddFunc:    controller.enqueueAddSubnet,
×
849
                UpdateFunc: controller.enqueueUpdateSubnet,
×
850
                DeleteFunc: controller.enqueueDeleteSubnet,
×
851
        }); err != nil {
×
852
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
853
        }
×
854

855
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
856
                AddFunc:    controller.enqueueAddIPPool,
×
857
                UpdateFunc: controller.enqueueUpdateIPPool,
×
858
                DeleteFunc: controller.enqueueDeleteIPPool,
×
859
        }); err != nil {
×
860
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
861
        }
×
862

863
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
864
                AddFunc:    controller.enqueueAddIP,
×
865
                UpdateFunc: controller.enqueueUpdateIP,
×
866
                DeleteFunc: controller.enqueueDelIP,
×
867
        }); err != nil {
×
868
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
869
        }
×
870

871
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
872
                AddFunc:    controller.enqueueAddVlan,
×
873
                DeleteFunc: controller.enqueueDelVlan,
×
874
                UpdateFunc: controller.enqueueUpdateVlan,
×
875
        }); err != nil {
×
876
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
877
        }
×
878

879
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
880
                AddFunc:    controller.enqueueAddSg,
×
881
                DeleteFunc: controller.enqueueDeleteSg,
×
882
                UpdateFunc: controller.enqueueUpdateSg,
×
883
        }); err != nil {
×
884
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
885
        }
×
886

887
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
888
                AddFunc:    controller.enqueueAddVirtualIP,
×
889
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
890
                DeleteFunc: controller.enqueueDelVirtualIP,
×
891
        }); err != nil {
×
892
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
893
        }
×
894

895
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
896
                AddFunc:    controller.enqueueAddIptablesEip,
×
897
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
898
                DeleteFunc: controller.enqueueDelIptablesEip,
×
899
        }); err != nil {
×
900
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
901
        }
×
902

903
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
904
                AddFunc:    controller.enqueueAddIptablesFip,
×
905
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
906
                DeleteFunc: controller.enqueueDelIptablesFip,
×
907
        }); err != nil {
×
908
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
909
        }
×
910

911
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
912
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
913
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
914
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
915
        }); err != nil {
×
916
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
917
        }
×
918

919
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
920
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
921
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
922
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
923
        }); err != nil {
×
924
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
925
        }
×
926

927
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
928
                AddFunc:    controller.enqueueAddOvnEip,
×
929
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
930
                DeleteFunc: controller.enqueueDelOvnEip,
×
931
        }); err != nil {
×
932
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
933
        }
×
934

935
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
936
                AddFunc:    controller.enqueueAddOvnFip,
×
937
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
938
                DeleteFunc: controller.enqueueDelOvnFip,
×
939
        }); err != nil {
×
940
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
941
        }
×
942

943
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
944
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
945
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
946
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
947
        }); err != nil {
×
948
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
949
        }
×
950

951
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
952
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
953
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
954
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
955
        }); err != nil {
×
956
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
957
        }
×
958

959
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
960
                AddFunc:    controller.enqueueAddQoSPolicy,
×
961
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
962
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
963
        }); err != nil {
×
964
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
965
        }
×
966

967
        if config.EnableLb {
×
968
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
969
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
970
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
971
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
972
                }); err != nil {
×
973
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
974
                }
×
975

976
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
977
                        AddFunc:    controller.enqueueAddVpcDNS,
×
978
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
979
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
980
                }); err != nil {
×
981
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
982
                }
×
983
        }
984

985
        if config.EnableNP {
×
986
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
987
                        AddFunc:    controller.enqueueAddNp,
×
988
                        UpdateFunc: controller.enqueueUpdateNp,
×
989
                        DeleteFunc: controller.enqueueDeleteNp,
×
990
                }); err != nil {
×
991
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
992
                }
×
993
        }
994

995
        if config.EnableANP {
×
996
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
997
                        AddFunc:    controller.enqueueAddAnp,
×
998
                        UpdateFunc: controller.enqueueUpdateAnp,
×
999
                        DeleteFunc: controller.enqueueDeleteAnp,
×
1000
                }); err != nil {
×
1001
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
1002
                }
×
1003

1004
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1005
                        AddFunc:    controller.enqueueAddBanp,
×
1006
                        UpdateFunc: controller.enqueueUpdateBanp,
×
1007
                        DeleteFunc: controller.enqueueDeleteBanp,
×
1008
                }); err != nil {
×
1009
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
1010
                }
×
1011

1012
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1013
                        AddFunc:    controller.enqueueAddCnp,
×
1014
                        UpdateFunc: controller.enqueueUpdateCnp,
×
1015
                        DeleteFunc: controller.enqueueDeleteCnp,
×
1016
                }); err != nil {
×
1017
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
1018
                }
×
1019

1020
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
1021
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1022
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1023
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
1024
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
1025
        }
1026

1027
        if config.EnableDNSNameResolver {
×
1028
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1029
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
1030
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
1031
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
1032
                }); err != nil {
×
1033
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
1034
                }
×
1035
        }
1036

1037
        if config.EnableOVNIPSec {
×
1038
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1039
                        AddFunc:    controller.enqueueAddCsr,
×
1040
                        UpdateFunc: controller.enqueueUpdateCsr,
×
1041
                        // no need to add delete func for csr
×
1042
                }); err != nil {
×
1043
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
1044
                }
×
1045
        }
1046

1047
        controller.Run(ctx)
×
1048
}
1049

1050
// Run will set up the event handlers for types we are interested in, as well
1051
// as syncing informer caches and starting workers. It will block until stopCh
1052
// is closed, at which point it will shutdown the workqueue and wait for
1053
// workers to finish processing their current work items.
1054
func (c *Controller) Run(ctx context.Context) {
×
1055
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1056
        // Otherwise, the init process should be placed after all workers have already started working
×
1057
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
1058
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1059
        }
×
1060

1061
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
1062
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1063
        }
×
1064

1065
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
1066
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1067
        }
×
1068

1069
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
1070
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1071
        }
×
1072

1073
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
1074
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1075
        }
×
1076

1077
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
1078
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1079
        }
×
1080

1081
        if err := c.InitOVN(); err != nil {
×
1082
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
1083
        }
×
1084

1085
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1086
        if err := c.syncIPCR(); err != nil {
×
1087
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1088
        }
×
1089

1090
        if err := c.syncFinalizers(); err != nil {
×
1091
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1092
        }
×
1093

1094
        if err := c.InitIPAM(); err != nil {
×
1095
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1096
        }
×
1097

1098
        if err := c.syncNodeRoutes(); err != nil {
×
1099
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1100
        }
×
1101

1102
        if err := c.syncSubnetCR(); err != nil {
×
1103
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1104
        }
×
1105

1106
        if err := c.syncVlanCR(); err != nil {
×
1107
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1108
        }
×
1109

1110
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1111
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
1112
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
1113
                }
×
1114
        }
1115

1116
        // start workers to do all the network operations
1117
        c.startWorkers(ctx)
×
1118

×
1119
        c.initResourceOnce()
×
1120
        <-ctx.Done()
×
1121
        klog.Info("Shutting down workers")
×
1122

×
1123
        c.OVNNbClient.Close()
×
1124
        c.OVNSbClient.Close()
×
1125
}
1126

1127
func (c *Controller) dbStatus() {
×
1128
        const maxFailures = 5
×
1129

×
1130
        done := make(chan error, 2)
×
1131
        go func() {
×
1132
                done <- c.OVNNbClient.Echo(context.Background())
×
1133
        }()
×
1134
        go func() {
×
1135
                done <- c.OVNSbClient.Echo(context.Background())
×
1136
        }()
×
1137

1138
        resultsReceived := 0
×
1139
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1140

×
1141
        for resultsReceived < 2 {
×
1142
                select {
×
1143
                case err := <-done:
×
1144
                        resultsReceived++
×
1145
                        if err != nil {
×
1146
                                c.dbFailureCount++
×
1147
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1148
                                if c.dbFailureCount >= maxFailures {
×
1149
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
1150
                                }
×
1151
                                return
×
1152
                        }
1153
                case <-timeout:
×
1154
                        c.dbFailureCount++
×
1155
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1156
                        if c.dbFailureCount >= maxFailures {
×
1157
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
1158
                        }
×
1159
                        return
×
1160
                }
1161
        }
1162

1163
        if c.dbFailureCount > 0 {
×
1164
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
1165
                c.dbFailureCount = 0
×
1166
        }
×
1167
}
1168

1169
func (c *Controller) shutdown() {
×
1170
        utilruntime.HandleCrash()
×
1171

×
1172
        c.addOrUpdatePodQueue.ShutDown()
×
1173
        c.deletePodQueue.ShutDown()
×
1174
        c.updatePodSecurityQueue.ShutDown()
×
1175

×
1176
        c.addNamespaceQueue.ShutDown()
×
1177

×
1178
        c.addOrUpdateSubnetQueue.ShutDown()
×
1179
        c.deleteSubnetQueue.ShutDown()
×
1180
        c.updateSubnetStatusQueue.ShutDown()
×
1181
        c.syncVirtualPortsQueue.ShutDown()
×
1182

×
1183
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1184
        c.updateIPPoolStatusQueue.ShutDown()
×
1185
        c.deleteIPPoolQueue.ShutDown()
×
1186

×
1187
        c.addNodeQueue.ShutDown()
×
1188
        c.updateNodeQueue.ShutDown()
×
1189
        c.deleteNodeQueue.ShutDown()
×
1190

×
1191
        c.addServiceQueue.ShutDown()
×
1192
        c.deleteServiceQueue.ShutDown()
×
1193
        c.updateServiceQueue.ShutDown()
×
1194
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1195

×
1196
        c.addVlanQueue.ShutDown()
×
1197
        c.delVlanQueue.ShutDown()
×
1198
        c.updateVlanQueue.ShutDown()
×
1199

×
1200
        c.addOrUpdateVpcQueue.ShutDown()
×
1201
        c.updateVpcStatusQueue.ShutDown()
×
1202
        c.delVpcQueue.ShutDown()
×
1203

×
1204
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1205
        c.initVpcNatGatewayQueue.ShutDown()
×
1206
        c.delVpcNatGatewayQueue.ShutDown()
×
1207
        c.updateVpcEipQueue.ShutDown()
×
1208
        c.updateVpcFloatingIPQueue.ShutDown()
×
1209
        c.updateVpcDnatQueue.ShutDown()
×
1210
        c.updateVpcSnatQueue.ShutDown()
×
1211
        c.updateVpcSubnetQueue.ShutDown()
×
1212

×
1213
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1214
        c.delVpcEgressGatewayQueue.ShutDown()
×
1215

×
1216
        if c.config.EnableLb {
×
1217
                c.addSwitchLBRuleQueue.ShutDown()
×
1218
                c.delSwitchLBRuleQueue.ShutDown()
×
1219
                c.updateSwitchLBRuleQueue.ShutDown()
×
1220

×
1221
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
1222
                c.delVpcDNSQueue.ShutDown()
×
1223
        }
×
1224

1225
        c.addIPQueue.ShutDown()
×
1226
        c.updateIPQueue.ShutDown()
×
1227
        c.delIPQueue.ShutDown()
×
1228

×
1229
        c.addVirtualIPQueue.ShutDown()
×
1230
        c.updateVirtualIPQueue.ShutDown()
×
1231
        c.updateVirtualParentsQueue.ShutDown()
×
1232
        c.delVirtualIPQueue.ShutDown()
×
1233

×
1234
        c.addIptablesEipQueue.ShutDown()
×
1235
        c.updateIptablesEipQueue.ShutDown()
×
1236
        c.resetIptablesEipQueue.ShutDown()
×
1237
        c.delIptablesEipQueue.ShutDown()
×
1238

×
1239
        c.addIptablesFipQueue.ShutDown()
×
1240
        c.updateIptablesFipQueue.ShutDown()
×
1241
        c.delIptablesFipQueue.ShutDown()
×
1242

×
1243
        c.addIptablesDnatRuleQueue.ShutDown()
×
1244
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1245
        c.delIptablesDnatRuleQueue.ShutDown()
×
1246

×
1247
        c.addIptablesSnatRuleQueue.ShutDown()
×
1248
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1249
        c.delIptablesSnatRuleQueue.ShutDown()
×
1250

×
1251
        c.addQoSPolicyQueue.ShutDown()
×
1252
        c.updateQoSPolicyQueue.ShutDown()
×
1253
        c.delQoSPolicyQueue.ShutDown()
×
1254

×
1255
        c.addOvnEipQueue.ShutDown()
×
1256
        c.updateOvnEipQueue.ShutDown()
×
1257
        c.resetOvnEipQueue.ShutDown()
×
1258
        c.delOvnEipQueue.ShutDown()
×
1259

×
1260
        c.addOvnFipQueue.ShutDown()
×
1261
        c.updateOvnFipQueue.ShutDown()
×
1262
        c.delOvnFipQueue.ShutDown()
×
1263

×
1264
        c.addOvnSnatRuleQueue.ShutDown()
×
1265
        c.updateOvnSnatRuleQueue.ShutDown()
×
1266
        c.delOvnSnatRuleQueue.ShutDown()
×
1267

×
1268
        c.addOvnDnatRuleQueue.ShutDown()
×
1269
        c.updateOvnDnatRuleQueue.ShutDown()
×
1270
        c.delOvnDnatRuleQueue.ShutDown()
×
1271

×
1272
        if c.config.EnableNP {
×
1273
                c.updateNpQueue.ShutDown()
×
1274
                c.deleteNpQueue.ShutDown()
×
1275
        }
×
1276
        if c.config.EnableANP {
×
1277
                c.addAnpQueue.ShutDown()
×
1278
                c.updateAnpQueue.ShutDown()
×
1279
                c.deleteAnpQueue.ShutDown()
×
1280

×
1281
                c.addBanpQueue.ShutDown()
×
1282
                c.updateBanpQueue.ShutDown()
×
1283
                c.deleteBanpQueue.ShutDown()
×
1284

×
1285
                c.addCnpQueue.ShutDown()
×
1286
                c.updateCnpQueue.ShutDown()
×
1287
                c.deleteCnpQueue.ShutDown()
×
1288
        }
×
1289

1290
        if c.config.EnableDNSNameResolver {
×
1291
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
1292
                c.deleteDNSNameResolverQueue.ShutDown()
×
1293
        }
×
1294

1295
        c.addOrUpdateSgQueue.ShutDown()
×
1296
        c.delSgQueue.ShutDown()
×
1297
        c.syncSgPortsQueue.ShutDown()
×
1298

×
1299
        c.addOrUpdateCsrQueue.ShutDown()
×
1300

×
1301
        if c.config.EnableLiveMigrationOptimize {
×
1302
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
1303
        }
×
1304
}
1305

1306
func (c *Controller) startWorkers(ctx context.Context) {
×
1307
        klog.Info("Starting workers")
×
1308

×
1309
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1310
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1311
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1312

×
1313
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1314
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1315
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1316
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1317
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1318
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1319
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1320
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1321
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1322
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1323
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1324
        // add default and join subnet and wait them ready
×
1325
        for range c.config.WorkerNum {
×
1326
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1327
        }
×
1328
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1329
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1330
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1331
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1332
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1333
                klog.Infof("wait for subnets %v ready", subnets)
×
1334

×
1335
                return c.allSubnetReady(subnets...)
×
1336
        })
×
1337
        if err != nil {
×
1338
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1339
        }
×
1340

1341
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1342
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1343
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1344

×
1345
        // run node worker before handle any pods
×
1346
        for range c.config.WorkerNum {
×
1347
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1348
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1349
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1350
        }
×
1351
        for {
×
1352
                ready := true
×
1353
                time.Sleep(3 * time.Second)
×
1354
                nodes, err := c.nodesLister.List(labels.Everything())
×
1355
                if err != nil {
×
1356
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1357
                }
×
1358
                for _, node := range nodes {
×
1359
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1360
                                klog.Infof("wait node %s annotation ready", node.Name)
×
1361
                                ready = false
×
1362
                                break
×
1363
                        }
1364
                }
1365
                if ready {
×
1366
                        break
×
1367
                }
1368
        }
1369

1370
        if c.config.EnableLb {
×
1371
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1372
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1373
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1374

×
1375
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1376
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1377
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1378

×
1379
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1380
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1381
                go wait.Until(func() {
×
1382
                        c.resyncVpcDNSConfig()
×
1383
                }, 5*time.Second, ctx.Done())
×
1384
        }
1385

1386
        for range c.config.WorkerNum {
×
1387
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1388
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1389
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1390

×
1391
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1392
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1393
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1394
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1395
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1396

×
1397
                if c.config.EnableLb {
×
1398
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
1399
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1400
                }
×
1401

1402
                if c.config.EnableNP {
×
1403
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
1404
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1405
                }
×
1406

1407
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
1408
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1409
        }
1410

1411
        if c.config.EnableEipSnat {
×
1412
                go wait.Until(func() {
×
1413
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
1414
                        c.resyncExternalGateway()
×
1415
                }, time.Second, ctx.Done())
×
1416

1417
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
1418
                c.OVNNbClient.MonitorBFD()
×
1419
        }
1420
        // TODO: we should merge these two vpc nat config into one config and resync them together
1421
        go wait.Until(func() {
×
1422
                c.resyncVpcNatGwConfig()
×
1423
        }, time.Second, ctx.Done())
×
1424

1425
        go wait.Until(func() {
×
1426
                c.resyncVpcNatConfig()
×
1427
        }, time.Second, ctx.Done())
×
1428

1429
        if c.config.GCInterval != 0 {
×
1430
                go wait.Until(func() {
×
1431
                        if err := c.markAndCleanLSP(); err != nil {
×
1432
                                klog.Errorf("gc lsp error: %v", err)
×
1433
                        }
×
1434
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1435
        }
1436

1437
        go wait.Until(func() {
×
1438
                if err := c.inspectPod(); err != nil {
×
1439
                        klog.Errorf("inspection error: %v", err)
×
1440
                }
×
1441
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1442

1443
        if c.config.EnableExternalVpc {
×
1444
                go wait.Until(func() {
×
1445
                        c.syncExternalVpc()
×
1446
                }, 5*time.Second, ctx.Done())
×
1447
        }
1448

1449
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1450
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1451
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1452
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1453

×
1454
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1455
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1456
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1457
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1458

×
1459
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1460
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1461
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1462

×
1463
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1464
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1465
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1466

×
1467
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1468
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1469
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1470

×
1471
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1472

×
1473
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1474
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1475
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1476

×
1477
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1478
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1479
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1480
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1481

×
1482
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1483
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1484
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1485
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1486

×
1487
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1488
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1489
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1490

×
1491
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1492
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1493
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1494

×
1495
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1496
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1497
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1498

×
1499
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1500
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1501
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1502

×
1503
        if c.config.EnableANP {
×
1504
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1505
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1506
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1507

×
1508
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1509
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1510
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1511

×
1512
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1513
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
1514
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1515
        }
×
1516

1517
        if c.config.EnableDNSNameResolver {
×
1518
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
1519
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1520
        }
×
1521

1522
        if c.config.EnableLiveMigrationOptimize {
×
1523
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1524
        }
×
1525

1526
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
1527

×
1528
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1529
}
1530

1531
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
1✔
1532
        for _, lsName := range subnets {
2✔
1533
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
1✔
1534
                if err != nil {
1✔
1535
                        klog.Error(err)
×
1536
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
1537
                }
×
1538

1539
                if !exist {
2✔
1540
                        return false, nil
1✔
1541
                }
1✔
1542
        }
1543

1544
        return true, nil
1✔
1545
}
1546

1547
func (c *Controller) initResourceOnce() {
×
1548
        c.registerSubnetMetrics()
×
1549

×
1550
        if err := c.initNodeChassis(); err != nil {
×
1551
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1552
        }
×
1553

1554
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1555
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1556
        }
×
1557
        if err := c.syncSecurityGroup(); err != nil {
×
1558
                util.LogFatalAndExit(err, "failed to sync security group")
×
1559
        }
×
1560

1561
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
1562
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1563
        }
×
1564

1565
        if err := c.initVpcNatGw(); err != nil {
×
1566
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1567
        }
×
1568
        if c.config.EnableLb {
×
1569
                if err := c.initVpcDNSConfig(); err != nil {
×
1570
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
1571
                }
×
1572
        }
1573

1574
        // remove resources in ovndb that not exist any more in kubernetes resources
1575
        // process gc at last in case of affecting other init process
1576
        if err := c.gc(); err != nil {
×
1577
                util.LogFatalAndExit(err, "failed to run gc")
×
1578
        }
×
1579
}
1580

1581
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1582
        item, shutdown := queue.Get()
×
1583
        if shutdown {
×
1584
                return false
×
1585
        }
×
1586

1587
        err := func(item T) error {
×
1588
                defer queue.Done(item)
×
1589
                if err := handler(item); err != nil {
×
1590
                        queue.AddRateLimited(item)
×
1591
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1592
                }
×
1593
                queue.Forget(item)
×
1594
                return nil
×
1595
        }(item)
1596
        if err != nil {
×
1597
                utilruntime.HandleError(err)
×
1598
                return true
×
1599
        }
×
1600
        return true
×
1601
}
1602

1603
func getWorkItemKey(obj any) string {
×
1604
        switch v := obj.(type) {
×
1605
        case string:
×
1606
                return v
×
1607
        case *vpcService:
×
1608
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1609
        case *AdminNetworkPolicyChangedDelta:
×
1610
                return v.key
×
1611
        case *SwitchLBRuleInfo:
×
1612
                return v.Name
×
1613
        default:
×
1614
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1615
                if err != nil {
×
1616
                        utilruntime.HandleError(err)
×
1617
                        return ""
×
1618
                }
×
1619
                return key
×
1620
        }
1621
}
1622

1623
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1624
        return func() {
×
1625
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
1626
                }
×
1627
        }
1628
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc