• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 23781674255

31 Mar 2026 05:15AM UTC coverage: 23.491% (+0.1%) from 23.369%
23781674255

push

github

oilbeater
fix(ha): adapt OvnDatabaseControl call for release-1.15 compatibility

Use exec.Command directly instead of ovs.OvnDatabaseControl which
does not exist on this branch.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

0 of 3 new or added lines in 1 file covered. (0.0%)

1295 existing lines in 10 files now uncovered.

12793 of 54458 relevant lines covered (23.49%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.15
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        netAttach "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
13
        netAttachv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
14
        "github.com/puzpuzpuz/xsync/v4"
15
        "golang.org/x/time/rate"
16
        corev1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        "k8s.io/client-go/discovery"
23
        kubeinformers "k8s.io/client-go/informers"
24
        "k8s.io/client-go/kubernetes/scheme"
25
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
26
        appsv1 "k8s.io/client-go/listers/apps/v1"
27
        certListerv1 "k8s.io/client-go/listers/certificates/v1"
28
        v1 "k8s.io/client-go/listers/core/v1"
29
        discoveryv1 "k8s.io/client-go/listers/discovery/v1"
30
        netv1 "k8s.io/client-go/listers/networking/v1"
31
        "k8s.io/client-go/tools/cache"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/util/workqueue"
34
        "k8s.io/klog/v2"
35
        "k8s.io/utils/keymutex"
36
        "k8s.io/utils/set"
37
        v1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
38
        netpolv1alpha2 "sigs.k8s.io/network-policy-api/apis/v1alpha2"
39
        anpinformer "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions"
40
        anplister "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha1"
41
        anplisterv1alpha2 "sigs.k8s.io/network-policy-api/pkg/client/listers/apis/v1alpha2"
42

43
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
44
        kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
45
        kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
46
        "github.com/kubeovn/kube-ovn/pkg/informer"
47
        ovnipam "github.com/kubeovn/kube-ovn/pkg/ipam"
48
        "github.com/kubeovn/kube-ovn/pkg/ovs"
49
        "github.com/kubeovn/kube-ovn/pkg/util"
50
)
51

52
const controllerAgentName = "kube-ovn-controller"
53

54
const (
55
        logicalSwitchKey              = "ls"
56
        logicalRouterKey              = "lr"
57
        portGroupKey                  = "pg"
58
        networkPolicyKey              = "np"
59
        sgKey                         = "sg"
60
        sgsKey                        = "security_groups"
61
        u2oKey                        = "u2o"
62
        adminNetworkPolicyKey         = "anp"
63
        baselineAdminNetworkPolicyKey = "banp"
64
        ippoolKey                     = "ippool"
65
        clusterNetworkPolicyKey       = "cnp"
66
)
67

68
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
69
type Controller struct {
70
        config *Configuration
71

72
        ipam             *ovnipam.IPAM
73
        namedPort        *NamedPort
74
        anpPrioNameMap   map[int32]string
75
        anpNamePrioMap   map[string]int32
76
        bnpPrioNameMap   map[int32]string
77
        bnpNamePrioMap   map[string]int32
78
        priorityMapMutex sync.RWMutex
79

80
        OVNNbClient ovs.NbClient
81
        OVNSbClient ovs.SbClient
82

83
        // ExternalGatewayType define external gateway type, centralized
84
        ExternalGatewayType string
85

86
        podsLister             v1.PodLister
87
        podsSynced             cache.InformerSynced
88
        addOrUpdatePodQueue    workqueue.TypedRateLimitingInterface[string]
89
        deletePodQueue         workqueue.TypedRateLimitingInterface[string]
90
        deletingPodObjMap      *xsync.Map[string, *corev1.Pod]
91
        deletingNodeObjMap     *xsync.Map[string, *corev1.Node]
92
        updatePodSecurityQueue workqueue.TypedRateLimitingInterface[string]
93
        podKeyMutex            keymutex.KeyMutex
94

95
        vpcsLister           kubeovnlister.VpcLister
96
        vpcSynced            cache.InformerSynced
97
        addOrUpdateVpcQueue  workqueue.TypedRateLimitingInterface[string]
98
        vpcLastPoliciesMap   *xsync.Map[string, string]
99
        delVpcQueue          workqueue.TypedRateLimitingInterface[*kubeovnv1.Vpc]
100
        updateVpcStatusQueue workqueue.TypedRateLimitingInterface[string]
101
        vpcKeyMutex          keymutex.KeyMutex
102

103
        vpcNatGatewayLister           kubeovnlister.VpcNatGatewayLister
104
        vpcNatGatewaySynced           cache.InformerSynced
105
        addOrUpdateVpcNatGatewayQueue workqueue.TypedRateLimitingInterface[string]
106
        delVpcNatGatewayQueue         workqueue.TypedRateLimitingInterface[string]
107
        initVpcNatGatewayQueue        workqueue.TypedRateLimitingInterface[string]
108
        updateVpcEipQueue             workqueue.TypedRateLimitingInterface[string]
109
        updateVpcFloatingIPQueue      workqueue.TypedRateLimitingInterface[string]
110
        updateVpcDnatQueue            workqueue.TypedRateLimitingInterface[string]
111
        updateVpcSnatQueue            workqueue.TypedRateLimitingInterface[string]
112
        updateVpcSubnetQueue          workqueue.TypedRateLimitingInterface[string]
113
        vpcNatGwKeyMutex              keymutex.KeyMutex
114
        vpcNatGwExecKeyMutex          keymutex.KeyMutex
115

116
        vpcEgressGatewayLister           kubeovnlister.VpcEgressGatewayLister
117
        vpcEgressGatewaySynced           cache.InformerSynced
118
        addOrUpdateVpcEgressGatewayQueue workqueue.TypedRateLimitingInterface[string]
119
        delVpcEgressGatewayQueue         workqueue.TypedRateLimitingInterface[string]
120
        vpcEgressGatewayKeyMutex         keymutex.KeyMutex
121

122
        switchLBRuleLister      kubeovnlister.SwitchLBRuleLister
123
        switchLBRuleSynced      cache.InformerSynced
124
        addSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[string]
125
        updateSwitchLBRuleQueue workqueue.TypedRateLimitingInterface[*SlrInfo]
126
        delSwitchLBRuleQueue    workqueue.TypedRateLimitingInterface[*SlrInfo]
127

128
        vpcDNSLister           kubeovnlister.VpcDnsLister
129
        vpcDNSSynced           cache.InformerSynced
130
        addOrUpdateVpcDNSQueue workqueue.TypedRateLimitingInterface[string]
131
        delVpcDNSQueue         workqueue.TypedRateLimitingInterface[string]
132

133
        subnetsLister           kubeovnlister.SubnetLister
134
        subnetSynced            cache.InformerSynced
135
        addOrUpdateSubnetQueue  workqueue.TypedRateLimitingInterface[string]
136
        deleteSubnetQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.Subnet]
137
        updateSubnetStatusQueue workqueue.TypedRateLimitingInterface[string]
138
        syncVirtualPortsQueue   workqueue.TypedRateLimitingInterface[string]
139
        subnetKeyMutex          keymutex.KeyMutex
140

141
        ippoolLister            kubeovnlister.IPPoolLister
142
        ippoolSynced            cache.InformerSynced
143
        addOrUpdateIPPoolQueue  workqueue.TypedRateLimitingInterface[string]
144
        updateIPPoolStatusQueue workqueue.TypedRateLimitingInterface[string]
145
        deleteIPPoolQueue       workqueue.TypedRateLimitingInterface[*kubeovnv1.IPPool]
146
        ippoolKeyMutex          keymutex.KeyMutex
147

148
        ipsLister     kubeovnlister.IPLister
149
        ipSynced      cache.InformerSynced
150
        addIPQueue    workqueue.TypedRateLimitingInterface[string]
151
        updateIPQueue workqueue.TypedRateLimitingInterface[string]
152
        delIPQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IP]
153

154
        virtualIpsLister          kubeovnlister.VipLister
155
        virtualIpsSynced          cache.InformerSynced
156
        addVirtualIPQueue         workqueue.TypedRateLimitingInterface[string]
157
        updateVirtualIPQueue      workqueue.TypedRateLimitingInterface[string]
158
        updateVirtualParentsQueue workqueue.TypedRateLimitingInterface[string]
159
        delVirtualIPQueue         workqueue.TypedRateLimitingInterface[*kubeovnv1.Vip]
160

161
        iptablesEipsLister     kubeovnlister.IptablesEIPLister
162
        iptablesEipSynced      cache.InformerSynced
163
        addIptablesEipQueue    workqueue.TypedRateLimitingInterface[string]
164
        updateIptablesEipQueue workqueue.TypedRateLimitingInterface[string]
165
        resetIptablesEipQueue  workqueue.TypedRateLimitingInterface[string]
166
        delIptablesEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.IptablesEIP]
167

168
        iptablesFipsLister     kubeovnlister.IptablesFIPRuleLister
169
        iptablesFipSynced      cache.InformerSynced
170
        addIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
171
        updateIptablesFipQueue workqueue.TypedRateLimitingInterface[string]
172
        delIptablesFipQueue    workqueue.TypedRateLimitingInterface[string]
173

174
        iptablesDnatRulesLister     kubeovnlister.IptablesDnatRuleLister
175
        iptablesDnatRuleSynced      cache.InformerSynced
176
        addIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
177
        updateIptablesDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
178
        delIptablesDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
179

180
        iptablesSnatRulesLister     kubeovnlister.IptablesSnatRuleLister
181
        iptablesSnatRuleSynced      cache.InformerSynced
182
        addIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
183
        updateIptablesSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
184
        delIptablesSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
185

186
        ovnEipsLister     kubeovnlister.OvnEipLister
187
        ovnEipSynced      cache.InformerSynced
188
        addOvnEipQueue    workqueue.TypedRateLimitingInterface[string]
189
        updateOvnEipQueue workqueue.TypedRateLimitingInterface[string]
190
        resetOvnEipQueue  workqueue.TypedRateLimitingInterface[string]
191
        delOvnEipQueue    workqueue.TypedRateLimitingInterface[*kubeovnv1.OvnEip]
192

193
        ovnFipsLister     kubeovnlister.OvnFipLister
194
        ovnFipSynced      cache.InformerSynced
195
        addOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
196
        updateOvnFipQueue workqueue.TypedRateLimitingInterface[string]
197
        delOvnFipQueue    workqueue.TypedRateLimitingInterface[string]
198

199
        ovnSnatRulesLister     kubeovnlister.OvnSnatRuleLister
200
        ovnSnatRuleSynced      cache.InformerSynced
201
        addOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
202
        updateOvnSnatRuleQueue workqueue.TypedRateLimitingInterface[string]
203
        delOvnSnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
204

205
        ovnDnatRulesLister     kubeovnlister.OvnDnatRuleLister
206
        ovnDnatRuleSynced      cache.InformerSynced
207
        addOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
208
        updateOvnDnatRuleQueue workqueue.TypedRateLimitingInterface[string]
209
        delOvnDnatRuleQueue    workqueue.TypedRateLimitingInterface[string]
210

211
        providerNetworksLister kubeovnlister.ProviderNetworkLister
212
        providerNetworkSynced  cache.InformerSynced
213

214
        vlansLister     kubeovnlister.VlanLister
215
        vlanSynced      cache.InformerSynced
216
        addVlanQueue    workqueue.TypedRateLimitingInterface[string]
217
        delVlanQueue    workqueue.TypedRateLimitingInterface[string]
218
        updateVlanQueue workqueue.TypedRateLimitingInterface[string]
219
        vlanKeyMutex    keymutex.KeyMutex
220

221
        namespacesLister  v1.NamespaceLister
222
        namespacesSynced  cache.InformerSynced
223
        addNamespaceQueue workqueue.TypedRateLimitingInterface[string]
224
        nsKeyMutex        keymutex.KeyMutex
225

226
        nodesLister     v1.NodeLister
227
        nodesSynced     cache.InformerSynced
228
        addNodeQueue    workqueue.TypedRateLimitingInterface[string]
229
        updateNodeQueue workqueue.TypedRateLimitingInterface[string]
230
        deleteNodeQueue workqueue.TypedRateLimitingInterface[string]
231
        nodeKeyMutex    keymutex.KeyMutex
232

233
        servicesLister     v1.ServiceLister
234
        serviceSynced      cache.InformerSynced
235
        addServiceQueue    workqueue.TypedRateLimitingInterface[string]
236
        deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
237
        updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
238
        svcKeyMutex        keymutex.KeyMutex
239

240
        endpointSlicesLister          discoveryv1.EndpointSliceLister
241
        endpointSlicesSynced          cache.InformerSynced
242
        addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
243
        epKeyMutex                    keymutex.KeyMutex
244

245
        deploymentsLister appsv1.DeploymentLister
246
        deploymentsSynced cache.InformerSynced
247

248
        npsLister     netv1.NetworkPolicyLister
249
        npsSynced     cache.InformerSynced
250
        updateNpQueue workqueue.TypedRateLimitingInterface[string]
251
        deleteNpQueue workqueue.TypedRateLimitingInterface[string]
252
        npKeyMutex    keymutex.KeyMutex
253

254
        sgsLister          kubeovnlister.SecurityGroupLister
255
        sgSynced           cache.InformerSynced
256
        addOrUpdateSgQueue workqueue.TypedRateLimitingInterface[string]
257
        delSgQueue         workqueue.TypedRateLimitingInterface[string]
258
        syncSgPortsQueue   workqueue.TypedRateLimitingInterface[string]
259
        sgKeyMutex         keymutex.KeyMutex
260

261
        qosPoliciesLister    kubeovnlister.QoSPolicyLister
262
        qosPolicySynced      cache.InformerSynced
263
        addQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
264
        updateQoSPolicyQueue workqueue.TypedRateLimitingInterface[string]
265
        delQoSPolicyQueue    workqueue.TypedRateLimitingInterface[string]
266

267
        configMapsLister v1.ConfigMapLister
268
        configMapsSynced cache.InformerSynced
269

270
        anpsLister     anplister.AdminNetworkPolicyLister
271
        anpsSynced     cache.InformerSynced
272
        addAnpQueue    workqueue.TypedRateLimitingInterface[string]
273
        updateAnpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
274
        deleteAnpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.AdminNetworkPolicy]
275
        anpKeyMutex    keymutex.KeyMutex
276

277
        dnsNameResolversLister          kubeovnlister.DNSNameResolverLister
278
        dnsNameResolversSynced          cache.InformerSynced
279
        addOrUpdateDNSNameResolverQueue workqueue.TypedRateLimitingInterface[string]
280
        deleteDNSNameResolverQueue      workqueue.TypedRateLimitingInterface[*kubeovnv1.DNSNameResolver]
281

282
        banpsLister     anplister.BaselineAdminNetworkPolicyLister
283
        banpsSynced     cache.InformerSynced
284
        addBanpQueue    workqueue.TypedRateLimitingInterface[string]
285
        updateBanpQueue workqueue.TypedRateLimitingInterface[*AdminNetworkPolicyChangedDelta]
286
        deleteBanpQueue workqueue.TypedRateLimitingInterface[*v1alpha1.BaselineAdminNetworkPolicy]
287
        banpKeyMutex    keymutex.KeyMutex
288

289
        cnpsLister     anplisterv1alpha2.ClusterNetworkPolicyLister
290
        cnpsSynced     cache.InformerSynced
291
        addCnpQueue    workqueue.TypedRateLimitingInterface[string]
292
        updateCnpQueue workqueue.TypedRateLimitingInterface[*ClusterNetworkPolicyChangedDelta]
293
        deleteCnpQueue workqueue.TypedRateLimitingInterface[*netpolv1alpha2.ClusterNetworkPolicy]
294
        cnpKeyMutex    keymutex.KeyMutex
295

296
        csrLister           certListerv1.CertificateSigningRequestLister
297
        csrSynced           cache.InformerSynced
298
        addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]
299

300
        addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
301
        deleteVMQueue                workqueue.TypedRateLimitingInterface[string]
302
        kubevirtInformerFactory      informer.KubeVirtInformerFactory
303

304
        netAttachLister          netAttachv1.NetworkAttachmentDefinitionLister
305
        netAttachSynced          cache.InformerSynced
306
        netAttachInformerFactory netAttach.SharedInformerFactory
307

308
        recorder               record.EventRecorder
309
        informerFactory        kubeinformers.SharedInformerFactory
310
        cmInformerFactory      kubeinformers.SharedInformerFactory
311
        deployInformerFactory  kubeinformers.SharedInformerFactory
312
        kubeovnInformerFactory kubeovninformer.SharedInformerFactory
313
        anpInformerFactory     anpinformer.SharedInformerFactory
314

315
        // Database health check
316
        dbFailureCount int
317

318
        distributedSubnetNeedSync atomic.Bool
319
}
320

321
func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] {
2✔
322
        if rateLimiter == nil {
4✔
323
                rateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]()
2✔
324
        }
2✔
325
        return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{Name: name})
2✔
326
}
327

328
// Run creates and runs a new ovn controller
329
func Run(ctx context.Context, config *Configuration) {
×
330
        klog.V(4).Info("Creating event broadcaster")
×
331
        eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{BurstSize: 100})
×
332
        eventBroadcaster.StartLogging(klog.Infof)
×
333
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
×
334
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
335
        custCrdRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
×
336
                workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
337
                &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
338
        )
×
339

×
340
        selector, err := labels.Parse(util.VpcEgressGatewayLabel)
×
341
        if err != nil {
×
UNCOV
342
                util.LogFatalAndExit(err, "failed to create label selector for vpc egress gateway workload")
×
343
        }
×
344

345
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
346
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
347
                        listOption.AllowWatchBookmarks = true
×
348
                }))
×
349
        cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
350
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
UNCOV
351
                        listOption.AllowWatchBookmarks = true
×
352
                }), kubeinformers.WithNamespace(config.PodNamespace))
×
353
        // deployment informer used to list/watch vpc egress gateway workloads
354
        deployInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeFactoryClient, 0,
×
355
                kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
356
                        listOption.AllowWatchBookmarks = true
×
357
                        listOption.LabelSelector = selector.String()
×
358
                }))
×
359
        kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnFactoryClient, 0,
×
360
                kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
361
                        listOption.AllowWatchBookmarks = true
×
362
                }))
×
363
        anpInformerFactory := anpinformer.NewSharedInformerFactoryWithOptions(config.AnpClient, 0,
×
364
                anpinformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
UNCOV
365
                        listOption.AllowWatchBookmarks = true
×
366
                }))
×
367

368
        attachNetInformerFactory := netAttach.NewSharedInformerFactoryWithOptions(config.AttachNetClient, 0,
×
369
                netAttach.WithTweakListOptions(func(listOption *metav1.ListOptions) {
×
UNCOV
370
                        listOption.AllowWatchBookmarks = true
×
UNCOV
371
                }),
×
372
        )
373

374
        kubevirtInformerFactory := informer.NewKubeVirtInformerFactory(config.KubevirtClient.RestClient(), config.KubevirtClient, nil, util.KubevirtNamespace)
×
375

×
376
        vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
×
377
        vpcNatGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways()
×
378
        vpcEgressGatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcEgressGateways()
×
379
        subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
×
380
        ippoolInformer := kubeovnInformerFactory.Kubeovn().V1().IPPools()
×
381
        ipInformer := kubeovnInformerFactory.Kubeovn().V1().IPs()
×
382
        virtualIPInformer := kubeovnInformerFactory.Kubeovn().V1().Vips()
×
383
        iptablesEipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs()
×
384
        iptablesFipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesFIPRules()
×
385
        iptablesDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesDnatRules()
×
386
        iptablesSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesSnatRules()
×
387
        vlanInformer := kubeovnInformerFactory.Kubeovn().V1().Vlans()
×
388
        providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks()
×
389
        sgInformer := kubeovnInformerFactory.Kubeovn().V1().SecurityGroups()
×
390
        podInformer := informerFactory.Core().V1().Pods()
×
391
        namespaceInformer := informerFactory.Core().V1().Namespaces()
×
392
        nodeInformer := informerFactory.Core().V1().Nodes()
×
393
        serviceInformer := informerFactory.Core().V1().Services()
×
394
        endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
×
395
        deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
×
396
        qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
×
397
        configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
×
398
        npInformer := informerFactory.Networking().V1().NetworkPolicies()
×
399
        switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules()
×
400
        vpcDNSInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses()
×
401
        ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips()
×
402
        ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips()
×
403
        ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
×
404
        ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()
×
405
        anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
×
406
        banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
×
407
        cnpInformer := anpInformerFactory.Policy().V1alpha2().ClusterNetworkPolicies()
×
408
        dnsNameResolverInformer := kubeovnInformerFactory.Kubeovn().V1().DNSNameResolvers()
×
409
        csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
×
410
        netAttachInformer := attachNetInformerFactory.K8sCniCncfIo().V1().NetworkAttachmentDefinitions()
×
411

×
412
        numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
×
413
        controller := &Controller{
×
414
                config:             config,
×
415
                deletingPodObjMap:  xsync.NewMap[string, *corev1.Pod](),
×
416
                deletingNodeObjMap: xsync.NewMap[string, *corev1.Node](),
×
417
                ipam:               ovnipam.NewIPAM(),
×
418
                namedPort:          NewNamedPort(),
×
419

×
420
                vpcsLister:           vpcInformer.Lister(),
×
421
                vpcSynced:            vpcInformer.Informer().HasSynced,
×
422
                addOrUpdateVpcQueue:  newTypedRateLimitingQueue[string]("AddOrUpdateVpc", nil),
×
423
                vpcLastPoliciesMap:   xsync.NewMap[string, string](),
×
424
                delVpcQueue:          newTypedRateLimitingQueue[*kubeovnv1.Vpc]("DeleteVpc", nil),
×
425
                updateVpcStatusQueue: newTypedRateLimitingQueue[string]("UpdateVpcStatus", nil),
×
426
                vpcKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
427

×
428
                vpcNatGatewayLister:              vpcNatGatewayInformer.Lister(),
×
429
                vpcNatGatewaySynced:              vpcNatGatewayInformer.Informer().HasSynced,
×
430
                addOrUpdateVpcNatGatewayQueue:    newTypedRateLimitingQueue("AddOrUpdateVpcNatGw", custCrdRateLimiter),
×
431
                initVpcNatGatewayQueue:           newTypedRateLimitingQueue("InitVpcNatGw", custCrdRateLimiter),
×
432
                delVpcNatGatewayQueue:            newTypedRateLimitingQueue("DeleteVpcNatGw", custCrdRateLimiter),
×
433
                updateVpcEipQueue:                newTypedRateLimitingQueue("UpdateVpcEip", custCrdRateLimiter),
×
434
                updateVpcFloatingIPQueue:         newTypedRateLimitingQueue("UpdateVpcFloatingIp", custCrdRateLimiter),
×
435
                updateVpcDnatQueue:               newTypedRateLimitingQueue("UpdateVpcDnat", custCrdRateLimiter),
×
436
                updateVpcSnatQueue:               newTypedRateLimitingQueue("UpdateVpcSnat", custCrdRateLimiter),
×
437
                updateVpcSubnetQueue:             newTypedRateLimitingQueue("UpdateVpcSubnet", custCrdRateLimiter),
×
438
                vpcNatGwKeyMutex:                 keymutex.NewHashed(numKeyLocks),
×
439
                vpcNatGwExecKeyMutex:             keymutex.NewHashed(numKeyLocks),
×
440
                vpcEgressGatewayLister:           vpcEgressGatewayInformer.Lister(),
×
441
                vpcEgressGatewaySynced:           vpcEgressGatewayInformer.Informer().HasSynced,
×
442
                addOrUpdateVpcEgressGatewayQueue: newTypedRateLimitingQueue("AddOrUpdateVpcEgressGateway", custCrdRateLimiter),
×
443
                delVpcEgressGatewayQueue:         newTypedRateLimitingQueue("DeleteVpcEgressGateway", custCrdRateLimiter),
×
444
                vpcEgressGatewayKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
445

×
446
                subnetsLister:           subnetInformer.Lister(),
×
447
                subnetSynced:            subnetInformer.Informer().HasSynced,
×
448
                addOrUpdateSubnetQueue:  newTypedRateLimitingQueue[string]("AddSubnet", nil),
×
449
                deleteSubnetQueue:       newTypedRateLimitingQueue[*kubeovnv1.Subnet]("DeleteSubnet", nil),
×
450
                updateSubnetStatusQueue: newTypedRateLimitingQueue[string]("UpdateSubnetStatus", nil),
×
451
                syncVirtualPortsQueue:   newTypedRateLimitingQueue[string]("SyncVirtualPort", nil),
×
452
                subnetKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
453

×
454
                ippoolLister:            ippoolInformer.Lister(),
×
455
                ippoolSynced:            ippoolInformer.Informer().HasSynced,
×
456
                addOrUpdateIPPoolQueue:  newTypedRateLimitingQueue[string]("AddIPPool", nil),
×
457
                updateIPPoolStatusQueue: newTypedRateLimitingQueue[string]("UpdateIPPoolStatus", nil),
×
458
                deleteIPPoolQueue:       newTypedRateLimitingQueue[*kubeovnv1.IPPool]("DeleteIPPool", nil),
×
459
                ippoolKeyMutex:          keymutex.NewHashed(numKeyLocks),
×
460

×
461
                ipsLister:     ipInformer.Lister(),
×
462
                ipSynced:      ipInformer.Informer().HasSynced,
×
463
                addIPQueue:    newTypedRateLimitingQueue[string]("AddIP", nil),
×
464
                updateIPQueue: newTypedRateLimitingQueue[string]("UpdateIP", nil),
×
465
                delIPQueue:    newTypedRateLimitingQueue[*kubeovnv1.IP]("DeleteIP", nil),
×
466

×
467
                virtualIpsLister:          virtualIPInformer.Lister(),
×
468
                virtualIpsSynced:          virtualIPInformer.Informer().HasSynced,
×
469
                addVirtualIPQueue:         newTypedRateLimitingQueue[string]("AddVirtualIP", nil),
×
470
                updateVirtualIPQueue:      newTypedRateLimitingQueue[string]("UpdateVirtualIP", nil),
×
471
                updateVirtualParentsQueue: newTypedRateLimitingQueue[string]("UpdateVirtualParents", nil),
×
472
                delVirtualIPQueue:         newTypedRateLimitingQueue[*kubeovnv1.Vip]("DeleteVirtualIP", nil),
×
473

×
474
                iptablesEipsLister:     iptablesEipInformer.Lister(),
×
475
                iptablesEipSynced:      iptablesEipInformer.Informer().HasSynced,
×
476
                addIptablesEipQueue:    newTypedRateLimitingQueue("AddIptablesEip", custCrdRateLimiter),
×
477
                updateIptablesEipQueue: newTypedRateLimitingQueue("UpdateIptablesEip", custCrdRateLimiter),
×
478
                resetIptablesEipQueue:  newTypedRateLimitingQueue("ResetIptablesEip", custCrdRateLimiter),
×
479
                delIptablesEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.IptablesEIP]("DeleteIptablesEip", nil),
×
480

×
481
                iptablesFipsLister:     iptablesFipInformer.Lister(),
×
482
                iptablesFipSynced:      iptablesFipInformer.Informer().HasSynced,
×
483
                addIptablesFipQueue:    newTypedRateLimitingQueue("AddIptablesFip", custCrdRateLimiter),
×
484
                updateIptablesFipQueue: newTypedRateLimitingQueue("UpdateIptablesFip", custCrdRateLimiter),
×
485
                delIptablesFipQueue:    newTypedRateLimitingQueue("DeleteIptablesFip", custCrdRateLimiter),
×
486

×
487
                iptablesDnatRulesLister:     iptablesDnatRuleInformer.Lister(),
×
488
                iptablesDnatRuleSynced:      iptablesDnatRuleInformer.Informer().HasSynced,
×
489
                addIptablesDnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesDnatRule", custCrdRateLimiter),
×
490
                updateIptablesDnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesDnatRule", custCrdRateLimiter),
×
491
                delIptablesDnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesDnatRule", custCrdRateLimiter),
×
492

×
493
                iptablesSnatRulesLister:     iptablesSnatRuleInformer.Lister(),
×
494
                iptablesSnatRuleSynced:      iptablesSnatRuleInformer.Informer().HasSynced,
×
495
                addIptablesSnatRuleQueue:    newTypedRateLimitingQueue("AddIptablesSnatRule", custCrdRateLimiter),
×
496
                updateIptablesSnatRuleQueue: newTypedRateLimitingQueue("UpdateIptablesSnatRule", custCrdRateLimiter),
×
497
                delIptablesSnatRuleQueue:    newTypedRateLimitingQueue("DeleteIptablesSnatRule", custCrdRateLimiter),
×
498

×
499
                vlansLister:     vlanInformer.Lister(),
×
500
                vlanSynced:      vlanInformer.Informer().HasSynced,
×
501
                addVlanQueue:    newTypedRateLimitingQueue[string]("AddVlan", nil),
×
502
                delVlanQueue:    newTypedRateLimitingQueue[string]("DeleteVlan", nil),
×
503
                updateVlanQueue: newTypedRateLimitingQueue[string]("UpdateVlan", nil),
×
504
                vlanKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
505

×
506
                providerNetworksLister: providerNetworkInformer.Lister(),
×
507
                providerNetworkSynced:  providerNetworkInformer.Informer().HasSynced,
×
508

×
509
                podsLister:          podInformer.Lister(),
×
510
                podsSynced:          podInformer.Informer().HasSynced,
×
511
                addOrUpdatePodQueue: newTypedRateLimitingQueue[string]("AddOrUpdatePod", nil),
×
512
                deletePodQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
×
513
                        workqueue.DefaultTypedControllerRateLimiter[string](),
×
514
                        workqueue.TypedRateLimitingQueueConfig[string]{
×
515
                                Name:          "DeletePod",
×
516
                                DelayingQueue: workqueue.NewTypedDelayingQueue[string](),
×
517
                        },
×
518
                ),
×
519
                updatePodSecurityQueue: newTypedRateLimitingQueue[string]("UpdatePodSecurity", nil),
×
520
                podKeyMutex:            keymutex.NewHashed(numKeyLocks),
×
521

×
522
                namespacesLister:  namespaceInformer.Lister(),
×
523
                namespacesSynced:  namespaceInformer.Informer().HasSynced,
×
524
                addNamespaceQueue: newTypedRateLimitingQueue[string]("AddNamespace", nil),
×
525
                nsKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
526

×
527
                nodesLister:     nodeInformer.Lister(),
×
528
                nodesSynced:     nodeInformer.Informer().HasSynced,
×
529
                addNodeQueue:    newTypedRateLimitingQueue[string]("AddNode", nil),
×
530
                updateNodeQueue: newTypedRateLimitingQueue[string]("UpdateNode", nil),
×
531
                deleteNodeQueue: newTypedRateLimitingQueue[string]("DeleteNode", nil),
×
532
                nodeKeyMutex:    keymutex.NewHashed(numKeyLocks),
×
533

×
534
                servicesLister:     serviceInformer.Lister(),
×
535
                serviceSynced:      serviceInformer.Informer().HasSynced,
×
536
                addServiceQueue:    newTypedRateLimitingQueue[string]("AddService", nil),
×
537
                deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
×
538
                updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
×
539
                svcKeyMutex:        keymutex.NewHashed(numKeyLocks),
×
540

×
541
                endpointSlicesLister:          endpointSliceInformer.Lister(),
×
542
                endpointSlicesSynced:          endpointSliceInformer.Informer().HasSynced,
×
543
                addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
×
544
                epKeyMutex:                    keymutex.NewHashed(numKeyLocks),
×
545

×
546
                deploymentsLister: deploymentInformer.Lister(),
×
547
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
548

×
549
                qosPoliciesLister:    qosPolicyInformer.Lister(),
×
550
                qosPolicySynced:      qosPolicyInformer.Informer().HasSynced,
×
551
                addQoSPolicyQueue:    newTypedRateLimitingQueue("AddQoSPolicy", custCrdRateLimiter),
×
552
                updateQoSPolicyQueue: newTypedRateLimitingQueue("UpdateQoSPolicy", custCrdRateLimiter),
×
553
                delQoSPolicyQueue:    newTypedRateLimitingQueue("DeleteQoSPolicy", custCrdRateLimiter),
×
554

×
555
                configMapsLister: configMapInformer.Lister(),
×
556
                configMapsSynced: configMapInformer.Informer().HasSynced,
×
557

×
558
                sgKeyMutex:         keymutex.NewHashed(numKeyLocks),
×
559
                sgsLister:          sgInformer.Lister(),
×
560
                sgSynced:           sgInformer.Informer().HasSynced,
×
561
                addOrUpdateSgQueue: newTypedRateLimitingQueue[string]("UpdateSecurityGroup", nil),
×
562
                delSgQueue:         newTypedRateLimitingQueue[string]("DeleteSecurityGroup", nil),
×
563
                syncSgPortsQueue:   newTypedRateLimitingQueue[string]("SyncSecurityGroupPorts", nil),
×
564

×
565
                ovnEipsLister:     ovnEipInformer.Lister(),
×
566
                ovnEipSynced:      ovnEipInformer.Informer().HasSynced,
×
567
                addOvnEipQueue:    newTypedRateLimitingQueue("AddOvnEip", custCrdRateLimiter),
×
568
                updateOvnEipQueue: newTypedRateLimitingQueue("UpdateOvnEip", custCrdRateLimiter),
×
569
                resetOvnEipQueue:  newTypedRateLimitingQueue("ResetOvnEip", custCrdRateLimiter),
×
570
                delOvnEipQueue:    newTypedRateLimitingQueue[*kubeovnv1.OvnEip]("DeleteOvnEip", nil),
×
571

×
572
                ovnFipsLister:     ovnFipInformer.Lister(),
×
573
                ovnFipSynced:      ovnFipInformer.Informer().HasSynced,
×
574
                addOvnFipQueue:    newTypedRateLimitingQueue("AddOvnFip", custCrdRateLimiter),
×
575
                updateOvnFipQueue: newTypedRateLimitingQueue("UpdateOvnFip", custCrdRateLimiter),
×
576
                delOvnFipQueue:    newTypedRateLimitingQueue("DeleteOvnFip", custCrdRateLimiter),
×
577

×
578
                ovnSnatRulesLister:     ovnSnatRuleInformer.Lister(),
×
579
                ovnSnatRuleSynced:      ovnSnatRuleInformer.Informer().HasSynced,
×
580
                addOvnSnatRuleQueue:    newTypedRateLimitingQueue("AddOvnSnatRule", custCrdRateLimiter),
×
581
                updateOvnSnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnSnatRule", custCrdRateLimiter),
×
582
                delOvnSnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnSnatRule", custCrdRateLimiter),
×
583

×
584
                ovnDnatRulesLister:     ovnDnatRuleInformer.Lister(),
×
585
                ovnDnatRuleSynced:      ovnDnatRuleInformer.Informer().HasSynced,
×
586
                addOvnDnatRuleQueue:    newTypedRateLimitingQueue("AddOvnDnatRule", custCrdRateLimiter),
×
587
                updateOvnDnatRuleQueue: newTypedRateLimitingQueue("UpdateOvnDnatRule", custCrdRateLimiter),
×
588
                delOvnDnatRuleQueue:    newTypedRateLimitingQueue("DeleteOvnDnatRule", custCrdRateLimiter),
×
589

×
590
                csrLister:           csrInformer.Lister(),
×
591
                csrSynced:           csrInformer.Informer().HasSynced,
×
592
                addOrUpdateCsrQueue: newTypedRateLimitingQueue("AddOrUpdateCSR", custCrdRateLimiter),
×
593

×
594
                addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
×
595
                deleteVMQueue:                newTypedRateLimitingQueue[string]("DeleteVM", nil),
×
596
                kubevirtInformerFactory:      kubevirtInformerFactory,
×
597

×
598
                netAttachLister:          netAttachInformer.Lister(),
×
599
                netAttachSynced:          netAttachInformer.Informer().HasSynced,
×
600
                netAttachInformerFactory: attachNetInformerFactory,
×
601

×
602
                recorder:               recorder,
×
603
                informerFactory:        informerFactory,
×
604
                cmInformerFactory:      cmInformerFactory,
×
605
                deployInformerFactory:  deployInformerFactory,
×
606
                kubeovnInformerFactory: kubeovnInformerFactory,
×
607
                anpInformerFactory:     anpInformerFactory,
×
608
        }
×
609

×
610
        if controller.OVNNbClient, err = ovs.NewOvnNbClient(
×
611
                config.OvnNbAddr,
×
612
                config.OvnTimeout,
×
613
                config.OvsDbConnectTimeout,
×
614
                config.OvsDbInactivityTimeout,
×
615
                config.OvsDbConnectMaxRetry,
×
616
        ); err != nil {
×
617
                util.LogFatalAndExit(err, "failed to create ovn nb client")
×
618
        }
×
619
        if controller.OVNSbClient, err = ovs.NewOvnSbClient(
×
620
                config.OvnSbAddr,
×
621
                config.OvnTimeout,
×
622
                config.OvsDbConnectTimeout,
×
623
                config.OvsDbInactivityTimeout,
×
624
                config.OvsDbConnectMaxRetry,
×
625
        ); err != nil {
×
626
                util.LogFatalAndExit(err, "failed to create ovn sb client")
×
627
        }
×
628
        if config.EnableLb {
×
629
                controller.switchLBRuleLister = switchLBRuleInformer.Lister()
×
630
                controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced
×
631
                controller.addSwitchLBRuleQueue = newTypedRateLimitingQueue("AddSwitchLBRule", custCrdRateLimiter)
×
632
                controller.delSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
633
                        "DeleteSwitchLBRule",
×
634
                        workqueue.NewTypedMaxOfRateLimiter(
×
635
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
636
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
637
                        ),
×
638
                )
×
639
                controller.updateSwitchLBRuleQueue = newTypedRateLimitingQueue(
×
640
                        "UpdateSwitchLBRule",
×
641
                        workqueue.NewTypedMaxOfRateLimiter(
×
642
                                workqueue.NewTypedItemExponentialFailureRateLimiter[*SlrInfo](time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
×
643
                                &workqueue.TypedBucketRateLimiter[*SlrInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
×
644
                        ),
×
645
                )
×
646

×
647
                controller.vpcDNSLister = vpcDNSInformer.Lister()
×
648
                controller.vpcDNSSynced = vpcDNSInformer.Informer().HasSynced
×
649
                controller.addOrUpdateVpcDNSQueue = newTypedRateLimitingQueue("AddOrUpdateVpcDns", custCrdRateLimiter)
×
UNCOV
650
                controller.delVpcDNSQueue = newTypedRateLimitingQueue("DeleteVpcDns", custCrdRateLimiter)
×
651
        }
×
652

653
        if config.EnableNP {
×
654
                controller.npsLister = npInformer.Lister()
×
655
                controller.npsSynced = npInformer.Informer().HasSynced
×
656
                controller.updateNpQueue = newTypedRateLimitingQueue[string]("UpdateNetworkPolicy", nil)
×
657
                controller.deleteNpQueue = newTypedRateLimitingQueue[string]("DeleteNetworkPolicy", nil)
×
UNCOV
658
                controller.npKeyMutex = keymutex.NewHashed(numKeyLocks)
×
659
        }
×
660

661
        if config.EnableANP {
×
662
                controller.anpsLister = anpInformer.Lister()
×
663
                controller.anpsSynced = anpInformer.Informer().HasSynced
×
664
                controller.addAnpQueue = newTypedRateLimitingQueue[string]("AddAdminNetworkPolicy", nil)
×
665
                controller.updateAnpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateAdminNetworkPolicy", nil)
×
666
                controller.deleteAnpQueue = newTypedRateLimitingQueue[*v1alpha1.AdminNetworkPolicy]("DeleteAdminNetworkPolicy", nil)
×
667
                controller.anpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
668

×
669
                controller.banpsLister = banpInformer.Lister()
×
670
                controller.banpsSynced = banpInformer.Informer().HasSynced
×
671
                controller.addBanpQueue = newTypedRateLimitingQueue[string]("AddBaseAdminNetworkPolicy", nil)
×
672
                controller.updateBanpQueue = newTypedRateLimitingQueue[*AdminNetworkPolicyChangedDelta]("UpdateBaseAdminNetworkPolicy", nil)
×
673
                controller.deleteBanpQueue = newTypedRateLimitingQueue[*v1alpha1.BaselineAdminNetworkPolicy]("DeleteBaseAdminNetworkPolicy", nil)
×
674
                controller.banpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
675

×
676
                controller.cnpsLister = cnpInformer.Lister()
×
677
                controller.cnpsSynced = cnpInformer.Informer().HasSynced
×
678
                controller.addCnpQueue = newTypedRateLimitingQueue[string]("AddClusterNetworkPolicy", nil)
×
679
                controller.updateCnpQueue = newTypedRateLimitingQueue[*ClusterNetworkPolicyChangedDelta]("UpdateClusterNetworkPolicy", nil)
×
680
                controller.deleteCnpQueue = newTypedRateLimitingQueue[*netpolv1alpha2.ClusterNetworkPolicy]("DeleteClusterNetworkPolicy", nil)
×
UNCOV
681
                controller.cnpKeyMutex = keymutex.NewHashed(numKeyLocks)
×
682
        }
×
683

684
        if config.EnableDNSNameResolver {
×
685
                controller.dnsNameResolversLister = dnsNameResolverInformer.Lister()
×
686
                controller.dnsNameResolversSynced = dnsNameResolverInformer.Informer().HasSynced
×
687
                controller.addOrUpdateDNSNameResolverQueue = newTypedRateLimitingQueue[string]("AddOrUpdateDNSNameResolver", nil)
×
UNCOV
688
                controller.deleteDNSNameResolverQueue = newTypedRateLimitingQueue[*kubeovnv1.DNSNameResolver]("DeleteDNSNameResolver", nil)
×
689
        }
×
690

691
        defer controller.shutdown()
×
692
        klog.Info("Starting OVN controller")
×
693

×
694
        // Start and sync NAD informer first, as many resources depend on NAD cache
×
695
        // NAD CRD is optional, so we check if it exists before starting the informer
×
696
        controller.StartNetAttachInformerFactory(ctx)
×
697

×
698
        // Wait for the caches to be synced before starting workers
×
699
        controller.informerFactory.Start(ctx.Done())
×
700
        controller.cmInformerFactory.Start(ctx.Done())
×
701
        controller.deployInformerFactory.Start(ctx.Done())
×
702
        controller.kubeovnInformerFactory.Start(ctx.Done())
×
703
        controller.anpInformerFactory.Start(ctx.Done())
×
704
        controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
×
705

×
706
        klog.Info("Waiting for informer caches to sync")
×
707
        cacheSyncs := []cache.InformerSynced{
×
708
                controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced,
×
709
                controller.vpcSynced, controller.subnetSynced,
×
710
                controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
×
711
                controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
×
712
                controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
×
713
                controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
×
714
                controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
×
715
                controller.ovnDnatRuleSynced,
×
716
        }
×
717
        if controller.config.EnableLb {
×
718
                cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDNSSynced)
×
719
        }
×
720
        if controller.config.EnableNP {
×
721
                cacheSyncs = append(cacheSyncs, controller.npsSynced)
×
722
        }
×
723
        if controller.config.EnableANP {
×
724
                cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced, controller.cnpsSynced)
×
725
        }
×
726
        if controller.config.EnableDNSNameResolver {
×
UNCOV
727
                cacheSyncs = append(cacheSyncs, controller.dnsNameResolversSynced)
×
728
        }
×
729

730
        if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
×
UNCOV
731
                util.LogFatalAndExit(nil, "failed to wait for caches to sync")
×
732
        }
×
733

734
        if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
735
                AddFunc:    controller.enqueueAddPod,
×
736
                DeleteFunc: controller.enqueueDeletePod,
×
737
                UpdateFunc: controller.enqueueUpdatePod,
×
738
        }); err != nil {
×
UNCOV
739
                util.LogFatalAndExit(err, "failed to add pod event handler")
×
740
        }
×
741

742
        if _, err = namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
743
                AddFunc:    controller.enqueueAddNamespace,
×
744
                UpdateFunc: controller.enqueueUpdateNamespace,
×
745
                DeleteFunc: controller.enqueueDeleteNamespace,
×
746
        }); err != nil {
×
UNCOV
747
                util.LogFatalAndExit(err, "failed to add namespace event handler")
×
748
        }
×
749

750
        if _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
751
                AddFunc:    controller.enqueueAddNode,
×
752
                UpdateFunc: controller.enqueueUpdateNode,
×
753
                DeleteFunc: controller.enqueueDeleteNode,
×
754
        }); err != nil {
×
UNCOV
755
                util.LogFatalAndExit(err, "failed to add node event handler")
×
756
        }
×
757

758
        if _, err = serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
759
                AddFunc:    controller.enqueueAddService,
×
760
                DeleteFunc: controller.enqueueDeleteService,
×
761
                UpdateFunc: controller.enqueueUpdateService,
×
762
        }); err != nil {
×
UNCOV
763
                util.LogFatalAndExit(err, "failed to add service event handler")
×
764
        }
×
765

766
        if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
767
                AddFunc:    controller.enqueueAddEndpointSlice,
×
768
                UpdateFunc: controller.enqueueUpdateEndpointSlice,
×
769
        }); err != nil {
×
UNCOV
770
                util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
×
771
        }
×
772

773
        if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
774
                AddFunc:    controller.enqueueAddDeployment,
×
775
                UpdateFunc: controller.enqueueUpdateDeployment,
×
776
        }); err != nil {
×
UNCOV
777
                util.LogFatalAndExit(err, "failed to add deployment event handler")
×
778
        }
×
779

780
        if _, err = vpcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
781
                AddFunc:    controller.enqueueAddVpc,
×
782
                UpdateFunc: controller.enqueueUpdateVpc,
×
783
                DeleteFunc: controller.enqueueDelVpc,
×
784
        }); err != nil {
×
UNCOV
785
                util.LogFatalAndExit(err, "failed to add vpc event handler")
×
786
        }
×
787

788
        if _, err = vpcNatGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
789
                AddFunc:    controller.enqueueAddVpcNatGw,
×
790
                UpdateFunc: controller.enqueueUpdateVpcNatGw,
×
791
                DeleteFunc: controller.enqueueDeleteVpcNatGw,
×
792
        }); err != nil {
×
UNCOV
793
                util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler")
×
794
        }
×
795

796
        if _, err = vpcEgressGatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
797
                AddFunc:    controller.enqueueAddVpcEgressGateway,
×
798
                UpdateFunc: controller.enqueueUpdateVpcEgressGateway,
×
799
                DeleteFunc: controller.enqueueDeleteVpcEgressGateway,
×
800
        }); err != nil {
×
UNCOV
801
                util.LogFatalAndExit(err, "failed to add vpc egress gateway event handler")
×
802
        }
×
803

804
        if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
805
                AddFunc:    controller.enqueueAddSubnet,
×
806
                UpdateFunc: controller.enqueueUpdateSubnet,
×
807
                DeleteFunc: controller.enqueueDeleteSubnet,
×
808
        }); err != nil {
×
UNCOV
809
                util.LogFatalAndExit(err, "failed to add subnet event handler")
×
810
        }
×
811

812
        if _, err = ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
813
                AddFunc:    controller.enqueueAddIPPool,
×
814
                UpdateFunc: controller.enqueueUpdateIPPool,
×
815
                DeleteFunc: controller.enqueueDeleteIPPool,
×
816
        }); err != nil {
×
UNCOV
817
                util.LogFatalAndExit(err, "failed to add ippool event handler")
×
818
        }
×
819

820
        if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
821
                AddFunc:    controller.enqueueAddIP,
×
822
                UpdateFunc: controller.enqueueUpdateIP,
×
823
                DeleteFunc: controller.enqueueDelIP,
×
824
        }); err != nil {
×
UNCOV
825
                util.LogFatalAndExit(err, "failed to add ips event handler")
×
826
        }
×
827

828
        if _, err = vlanInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
829
                AddFunc:    controller.enqueueAddVlan,
×
830
                DeleteFunc: controller.enqueueDelVlan,
×
831
                UpdateFunc: controller.enqueueUpdateVlan,
×
832
        }); err != nil {
×
UNCOV
833
                util.LogFatalAndExit(err, "failed to add vlan event handler")
×
834
        }
×
835

836
        if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
837
                AddFunc:    controller.enqueueAddSg,
×
838
                DeleteFunc: controller.enqueueDeleteSg,
×
839
                UpdateFunc: controller.enqueueUpdateSg,
×
840
        }); err != nil {
×
UNCOV
841
                util.LogFatalAndExit(err, "failed to add security group event handler")
×
842
        }
×
843

844
        if _, err = virtualIPInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
845
                AddFunc:    controller.enqueueAddVirtualIP,
×
846
                UpdateFunc: controller.enqueueUpdateVirtualIP,
×
847
                DeleteFunc: controller.enqueueDelVirtualIP,
×
848
        }); err != nil {
×
UNCOV
849
                util.LogFatalAndExit(err, "failed to add virtual ip event handler")
×
850
        }
×
851

852
        if _, err = iptablesEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
853
                AddFunc:    controller.enqueueAddIptablesEip,
×
854
                UpdateFunc: controller.enqueueUpdateIptablesEip,
×
855
                DeleteFunc: controller.enqueueDelIptablesEip,
×
856
        }); err != nil {
×
UNCOV
857
                util.LogFatalAndExit(err, "failed to add iptables eip event handler")
×
858
        }
×
859

860
        if _, err = iptablesFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
861
                AddFunc:    controller.enqueueAddIptablesFip,
×
862
                UpdateFunc: controller.enqueueUpdateIptablesFip,
×
863
                DeleteFunc: controller.enqueueDelIptablesFip,
×
864
        }); err != nil {
×
UNCOV
865
                util.LogFatalAndExit(err, "failed to add iptables fip event handler")
×
866
        }
×
867

868
        if _, err = iptablesDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
869
                AddFunc:    controller.enqueueAddIptablesDnatRule,
×
870
                UpdateFunc: controller.enqueueUpdateIptablesDnatRule,
×
871
                DeleteFunc: controller.enqueueDelIptablesDnatRule,
×
872
        }); err != nil {
×
UNCOV
873
                util.LogFatalAndExit(err, "failed to add iptables dnat event handler")
×
874
        }
×
875

876
        if _, err = iptablesSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
877
                AddFunc:    controller.enqueueAddIptablesSnatRule,
×
878
                UpdateFunc: controller.enqueueUpdateIptablesSnatRule,
×
879
                DeleteFunc: controller.enqueueDelIptablesSnatRule,
×
880
        }); err != nil {
×
UNCOV
881
                util.LogFatalAndExit(err, "failed to add iptables snat rule event handler")
×
882
        }
×
883

884
        if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
885
                AddFunc:    controller.enqueueAddOvnEip,
×
886
                UpdateFunc: controller.enqueueUpdateOvnEip,
×
887
                DeleteFunc: controller.enqueueDelOvnEip,
×
888
        }); err != nil {
×
UNCOV
889
                util.LogFatalAndExit(err, "failed to add ovn eip event handler")
×
890
        }
×
891

892
        if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
893
                AddFunc:    controller.enqueueAddOvnFip,
×
894
                UpdateFunc: controller.enqueueUpdateOvnFip,
×
895
                DeleteFunc: controller.enqueueDelOvnFip,
×
896
        }); err != nil {
×
UNCOV
897
                util.LogFatalAndExit(err, "failed to add ovn fip event handler")
×
898
        }
×
899

900
        if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
901
                AddFunc:    controller.enqueueAddOvnSnatRule,
×
902
                UpdateFunc: controller.enqueueUpdateOvnSnatRule,
×
903
                DeleteFunc: controller.enqueueDelOvnSnatRule,
×
904
        }); err != nil {
×
UNCOV
905
                util.LogFatalAndExit(err, "failed to add ovn snat rule event handler")
×
906
        }
×
907

908
        if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
909
                AddFunc:    controller.enqueueAddOvnDnatRule,
×
910
                UpdateFunc: controller.enqueueUpdateOvnDnatRule,
×
911
                DeleteFunc: controller.enqueueDelOvnDnatRule,
×
912
        }); err != nil {
×
UNCOV
913
                util.LogFatalAndExit(err, "failed to add ovn dnat rule event handler")
×
914
        }
×
915

916
        if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
917
                AddFunc:    controller.enqueueAddQoSPolicy,
×
918
                UpdateFunc: controller.enqueueUpdateQoSPolicy,
×
919
                DeleteFunc: controller.enqueueDelQoSPolicy,
×
920
        }); err != nil {
×
UNCOV
921
                util.LogFatalAndExit(err, "failed to add qos policy event handler")
×
922
        }
×
923

924
        if config.EnableLb {
×
925
                if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
926
                        AddFunc:    controller.enqueueAddSwitchLBRule,
×
927
                        UpdateFunc: controller.enqueueUpdateSwitchLBRule,
×
928
                        DeleteFunc: controller.enqueueDeleteSwitchLBRule,
×
929
                }); err != nil {
×
UNCOV
930
                        util.LogFatalAndExit(err, "failed to add switch lb rule event handler")
×
931
                }
×
932

933
                if _, err = vpcDNSInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
934
                        AddFunc:    controller.enqueueAddVpcDNS,
×
935
                        UpdateFunc: controller.enqueueUpdateVpcDNS,
×
936
                        DeleteFunc: controller.enqueueDeleteVPCDNS,
×
937
                }); err != nil {
×
UNCOV
938
                        util.LogFatalAndExit(err, "failed to add vpc dns event handler")
×
UNCOV
939
                }
×
940
        }
941

942
        if config.EnableNP {
×
943
                if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
944
                        AddFunc:    controller.enqueueAddNp,
×
945
                        UpdateFunc: controller.enqueueUpdateNp,
×
946
                        DeleteFunc: controller.enqueueDeleteNp,
×
947
                }); err != nil {
×
UNCOV
948
                        util.LogFatalAndExit(err, "failed to add network policy event handler")
×
UNCOV
949
                }
×
950
        }
951

952
        if config.EnableANP {
×
953
                if _, err = anpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
954
                        AddFunc:    controller.enqueueAddAnp,
×
955
                        UpdateFunc: controller.enqueueUpdateAnp,
×
956
                        DeleteFunc: controller.enqueueDeleteAnp,
×
957
                }); err != nil {
×
UNCOV
958
                        util.LogFatalAndExit(err, "failed to add admin network policy event handler")
×
959
                }
×
960

961
                if _, err = banpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
962
                        AddFunc:    controller.enqueueAddBanp,
×
963
                        UpdateFunc: controller.enqueueUpdateBanp,
×
964
                        DeleteFunc: controller.enqueueDeleteBanp,
×
965
                }); err != nil {
×
UNCOV
966
                        util.LogFatalAndExit(err, "failed to add baseline admin network policy event handler")
×
967
                }
×
968

969
                if _, err = cnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
970
                        AddFunc:    controller.enqueueAddCnp,
×
971
                        UpdateFunc: controller.enqueueUpdateCnp,
×
972
                        DeleteFunc: controller.enqueueDeleteCnp,
×
973
                }); err != nil {
×
UNCOV
974
                        util.LogFatalAndExit(err, "failed to add cluster network policy event handler")
×
975
                }
×
976

977
                maxPriorityPerMap := util.CnpMaxPriority + 1
×
978
                controller.anpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
979
                controller.anpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
UNCOV
980
                controller.bnpPrioNameMap = make(map[int32]string, maxPriorityPerMap)
×
UNCOV
981
                controller.bnpNamePrioMap = make(map[string]int32, maxPriorityPerMap)
×
982
        }
983

984
        if config.EnableDNSNameResolver {
×
985
                if _, err = dnsNameResolverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
986
                        AddFunc:    controller.enqueueAddDNSNameResolver,
×
987
                        UpdateFunc: controller.enqueueUpdateDNSNameResolver,
×
988
                        DeleteFunc: controller.enqueueDeleteDNSNameResolver,
×
989
                }); err != nil {
×
UNCOV
990
                        util.LogFatalAndExit(err, "failed to add dns name resolver event handler")
×
UNCOV
991
                }
×
992
        }
993

994
        if config.EnableOVNIPSec {
×
995
                if _, err = csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
996
                        AddFunc:    controller.enqueueAddCsr,
×
997
                        UpdateFunc: controller.enqueueUpdateCsr,
×
998
                        // no need to add delete func for csr
×
999
                }); err != nil {
×
UNCOV
1000
                        util.LogFatalAndExit(err, "failed to add csr event handler")
×
UNCOV
1001
                }
×
1002
        }
1003

UNCOV
1004
        controller.Run(ctx)
×
1005
}
1006

1007
// Run will set up the event handlers for types we are interested in, as well
1008
// as syncing informer caches and starting workers. It will block until stopCh
1009
// is closed, at which point it will shutdown the workqueue and wait for
1010
// workers to finish processing their current work items.
1011
func (c *Controller) Run(ctx context.Context) {
×
1012
        // The init process can only be placed here if the init process do really affect the normal process of controller, such as Nodes/Pods/Subnets...
×
1013
        // Otherwise, the init process should be placed after all workers have already started working
×
1014
        if err := c.OVNNbClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil {
×
UNCOV
1015
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst")
×
1016
        }
×
1017

1018
        if err := c.OVNNbClient.SetUseCtInvMatch(); err != nil {
×
UNCOV
1019
                util.LogFatalAndExit(err, "failed to set NB_Global option use_ct_inv_match to false")
×
1020
        }
×
1021

1022
        if err := c.OVNNbClient.SetLsCtSkipDstLportIPs(c.config.LsCtSkipDstLportIPs); err != nil {
×
UNCOV
1023
                util.LogFatalAndExit(err, "failed to set NB_Global option ls_ct_skip_dst_lport_ips")
×
1024
        }
×
1025

1026
        if err := c.OVNNbClient.SetNodeLocalDNSIP(strings.Join(c.config.NodeLocalDNSIPs, ",")); err != nil {
×
UNCOV
1027
                util.LogFatalAndExit(err, "failed to set NB_Global option node_local_dns_ip")
×
1028
        }
×
1029

1030
        if err := c.OVNNbClient.SetSkipConntrackCidrs(c.config.SkipConntrackDstCidrs); err != nil {
×
UNCOV
1031
                util.LogFatalAndExit(err, "failed to set NB_Global option skip_conntrack_ipcidrs")
×
1032
        }
×
1033

1034
        if err := c.OVNNbClient.SetOVNIPSec(c.config.EnableOVNIPSec); err != nil {
×
UNCOV
1035
                util.LogFatalAndExit(err, "failed to set NB_Global ipsec")
×
1036
        }
×
1037

1038
        if err := c.InitOVN(); err != nil {
×
UNCOV
1039
                util.LogFatalAndExit(err, "failed to initialize ovn resources")
×
UNCOV
1040
        }
×
1041

1042
        // sync ip crd before initIPAM since ip crd will be used to restore vm and statefulset pod in initIPAM
1043
        if err := c.syncIPCR(); err != nil {
×
UNCOV
1044
                util.LogFatalAndExit(err, "failed to sync crd ips")
×
1045
        }
×
1046

1047
        if err := c.syncFinalizers(); err != nil {
×
UNCOV
1048
                util.LogFatalAndExit(err, "failed to initialize crd finalizers")
×
1049
        }
×
1050

1051
        if err := c.InitIPAM(); err != nil {
×
UNCOV
1052
                util.LogFatalAndExit(err, "failed to initialize ipam")
×
1053
        }
×
1054

1055
        if err := c.syncNodeRoutes(); err != nil {
×
UNCOV
1056
                util.LogFatalAndExit(err, "failed to initialize node routes")
×
1057
        }
×
1058

1059
        if err := c.syncSubnetCR(); err != nil {
×
UNCOV
1060
                util.LogFatalAndExit(err, "failed to sync crd subnets")
×
1061
        }
×
1062

1063
        if err := c.syncVlanCR(); err != nil {
×
UNCOV
1064
                util.LogFatalAndExit(err, "failed to sync crd vlans")
×
1065
        }
×
1066

1067
        if c.config.EnableOVNIPSec && !c.config.CertManagerIPSecCert {
×
1068
                if err := c.InitDefaultOVNIPsecCA(); err != nil {
×
UNCOV
1069
                        util.LogFatalAndExit(err, "failed to init ovn ipsec CA")
×
UNCOV
1070
                }
×
1071
        }
1072

1073
        // start workers to do all the network operations
1074
        c.startWorkers(ctx)
×
1075

×
1076
        c.initResourceOnce()
×
1077
        <-ctx.Done()
×
1078
        klog.Info("Shutting down workers")
×
1079

×
UNCOV
1080
        c.OVNNbClient.Close()
×
UNCOV
1081
        c.OVNSbClient.Close()
×
1082
}
1083

1084
func (c *Controller) dbStatus() {
×
1085
        const maxFailures = 5
×
1086

×
1087
        done := make(chan error, 2)
×
1088
        go func() {
×
1089
                done <- c.OVNNbClient.Echo(context.Background())
×
1090
        }()
×
1091
        go func() {
×
UNCOV
1092
                done <- c.OVNSbClient.Echo(context.Background())
×
1093
        }()
×
1094

1095
        resultsReceived := 0
×
1096
        timeout := time.After(time.Duration(c.config.OvnTimeout) * time.Second)
×
1097

×
1098
        for resultsReceived < 2 {
×
1099
                select {
×
1100
                case err := <-done:
×
1101
                        resultsReceived++
×
1102
                        if err != nil {
×
1103
                                c.dbFailureCount++
×
1104
                                klog.Errorf("OVN database echo failed (%d/%d): %v", c.dbFailureCount, maxFailures, err)
×
1105
                                if c.dbFailureCount >= maxFailures {
×
1106
                                        util.LogFatalAndExit(err, "OVN database connection failed after %d attempts", maxFailures)
×
UNCOV
1107
                                }
×
1108
                                return
×
1109
                        }
1110
                case <-timeout:
×
1111
                        c.dbFailureCount++
×
1112
                        klog.Errorf("OVN database echo timeout (%d/%d) after %ds", c.dbFailureCount, maxFailures, c.config.OvnTimeout)
×
1113
                        if c.dbFailureCount >= maxFailures {
×
1114
                                util.LogFatalAndExit(nil, "OVN database connection timeout after %d attempts", maxFailures)
×
UNCOV
1115
                        }
×
UNCOV
1116
                        return
×
1117
                }
1118
        }
1119

1120
        if c.dbFailureCount > 0 {
×
1121
                klog.Infof("OVN database connection recovered after %d failures", c.dbFailureCount)
×
UNCOV
1122
                c.dbFailureCount = 0
×
UNCOV
1123
        }
×
1124
}
1125

1126
func (c *Controller) shutdown() {
×
1127
        utilruntime.HandleCrash()
×
1128

×
1129
        c.addOrUpdatePodQueue.ShutDown()
×
1130
        c.deletePodQueue.ShutDown()
×
1131
        c.updatePodSecurityQueue.ShutDown()
×
1132

×
1133
        c.addNamespaceQueue.ShutDown()
×
1134

×
1135
        c.addOrUpdateSubnetQueue.ShutDown()
×
1136
        c.deleteSubnetQueue.ShutDown()
×
1137
        c.updateSubnetStatusQueue.ShutDown()
×
1138
        c.syncVirtualPortsQueue.ShutDown()
×
1139

×
1140
        c.addOrUpdateIPPoolQueue.ShutDown()
×
1141
        c.updateIPPoolStatusQueue.ShutDown()
×
1142
        c.deleteIPPoolQueue.ShutDown()
×
1143

×
1144
        c.addNodeQueue.ShutDown()
×
1145
        c.updateNodeQueue.ShutDown()
×
1146
        c.deleteNodeQueue.ShutDown()
×
1147

×
1148
        c.addServiceQueue.ShutDown()
×
1149
        c.deleteServiceQueue.ShutDown()
×
1150
        c.updateServiceQueue.ShutDown()
×
1151
        c.addOrUpdateEndpointSliceQueue.ShutDown()
×
1152

×
1153
        c.addVlanQueue.ShutDown()
×
1154
        c.delVlanQueue.ShutDown()
×
1155
        c.updateVlanQueue.ShutDown()
×
1156

×
1157
        c.addOrUpdateVpcQueue.ShutDown()
×
1158
        c.updateVpcStatusQueue.ShutDown()
×
1159
        c.delVpcQueue.ShutDown()
×
1160

×
1161
        c.addOrUpdateVpcNatGatewayQueue.ShutDown()
×
1162
        c.initVpcNatGatewayQueue.ShutDown()
×
1163
        c.delVpcNatGatewayQueue.ShutDown()
×
1164
        c.updateVpcEipQueue.ShutDown()
×
1165
        c.updateVpcFloatingIPQueue.ShutDown()
×
1166
        c.updateVpcDnatQueue.ShutDown()
×
1167
        c.updateVpcSnatQueue.ShutDown()
×
1168
        c.updateVpcSubnetQueue.ShutDown()
×
1169

×
1170
        c.addOrUpdateVpcEgressGatewayQueue.ShutDown()
×
1171
        c.delVpcEgressGatewayQueue.ShutDown()
×
1172

×
1173
        if c.config.EnableLb {
×
1174
                c.addSwitchLBRuleQueue.ShutDown()
×
1175
                c.delSwitchLBRuleQueue.ShutDown()
×
1176
                c.updateSwitchLBRuleQueue.ShutDown()
×
1177

×
1178
                c.addOrUpdateVpcDNSQueue.ShutDown()
×
UNCOV
1179
                c.delVpcDNSQueue.ShutDown()
×
1180
        }
×
1181

1182
        c.addIPQueue.ShutDown()
×
1183
        c.updateIPQueue.ShutDown()
×
1184
        c.delIPQueue.ShutDown()
×
1185

×
1186
        c.addVirtualIPQueue.ShutDown()
×
1187
        c.updateVirtualIPQueue.ShutDown()
×
1188
        c.updateVirtualParentsQueue.ShutDown()
×
1189
        c.delVirtualIPQueue.ShutDown()
×
1190

×
1191
        c.addIptablesEipQueue.ShutDown()
×
1192
        c.updateIptablesEipQueue.ShutDown()
×
1193
        c.resetIptablesEipQueue.ShutDown()
×
1194
        c.delIptablesEipQueue.ShutDown()
×
1195

×
1196
        c.addIptablesFipQueue.ShutDown()
×
1197
        c.updateIptablesFipQueue.ShutDown()
×
1198
        c.delIptablesFipQueue.ShutDown()
×
1199

×
1200
        c.addIptablesDnatRuleQueue.ShutDown()
×
1201
        c.updateIptablesDnatRuleQueue.ShutDown()
×
1202
        c.delIptablesDnatRuleQueue.ShutDown()
×
1203

×
1204
        c.addIptablesSnatRuleQueue.ShutDown()
×
1205
        c.updateIptablesSnatRuleQueue.ShutDown()
×
1206
        c.delIptablesSnatRuleQueue.ShutDown()
×
1207

×
1208
        c.addQoSPolicyQueue.ShutDown()
×
1209
        c.updateQoSPolicyQueue.ShutDown()
×
1210
        c.delQoSPolicyQueue.ShutDown()
×
1211

×
1212
        c.addOvnEipQueue.ShutDown()
×
1213
        c.updateOvnEipQueue.ShutDown()
×
1214
        c.resetOvnEipQueue.ShutDown()
×
1215
        c.delOvnEipQueue.ShutDown()
×
1216

×
1217
        c.addOvnFipQueue.ShutDown()
×
1218
        c.updateOvnFipQueue.ShutDown()
×
1219
        c.delOvnFipQueue.ShutDown()
×
1220

×
1221
        c.addOvnSnatRuleQueue.ShutDown()
×
1222
        c.updateOvnSnatRuleQueue.ShutDown()
×
1223
        c.delOvnSnatRuleQueue.ShutDown()
×
1224

×
1225
        c.addOvnDnatRuleQueue.ShutDown()
×
1226
        c.updateOvnDnatRuleQueue.ShutDown()
×
1227
        c.delOvnDnatRuleQueue.ShutDown()
×
1228

×
1229
        if c.config.EnableNP {
×
1230
                c.updateNpQueue.ShutDown()
×
1231
                c.deleteNpQueue.ShutDown()
×
1232
        }
×
1233
        if c.config.EnableANP {
×
1234
                c.addAnpQueue.ShutDown()
×
1235
                c.updateAnpQueue.ShutDown()
×
1236
                c.deleteAnpQueue.ShutDown()
×
1237

×
1238
                c.addBanpQueue.ShutDown()
×
1239
                c.updateBanpQueue.ShutDown()
×
1240
                c.deleteBanpQueue.ShutDown()
×
1241

×
1242
                c.addCnpQueue.ShutDown()
×
1243
                c.updateCnpQueue.ShutDown()
×
UNCOV
1244
                c.deleteCnpQueue.ShutDown()
×
1245
        }
×
1246

1247
        if c.config.EnableDNSNameResolver {
×
1248
                c.addOrUpdateDNSNameResolverQueue.ShutDown()
×
UNCOV
1249
                c.deleteDNSNameResolverQueue.ShutDown()
×
1250
        }
×
1251

1252
        c.addOrUpdateSgQueue.ShutDown()
×
1253
        c.delSgQueue.ShutDown()
×
1254
        c.syncSgPortsQueue.ShutDown()
×
1255

×
1256
        c.addOrUpdateCsrQueue.ShutDown()
×
1257

×
1258
        if c.config.EnableLiveMigrationOptimize {
×
UNCOV
1259
                c.addOrUpdateVMIMigrationQueue.ShutDown()
×
UNCOV
1260
        }
×
1261
}
1262

1263
func (c *Controller) startWorkers(ctx context.Context) {
×
1264
        klog.Info("Starting workers")
×
1265

×
1266
        go wait.Until(runWorker("add/update vpc", c.addOrUpdateVpcQueue, c.handleAddOrUpdateVpc), time.Second, ctx.Done())
×
1267
        go wait.Until(runWorker("delete vpc", c.delVpcQueue, c.handleDelVpc), time.Second, ctx.Done())
×
1268
        go wait.Until(runWorker("update status of vpc", c.updateVpcStatusQueue, c.handleUpdateVpcStatus), time.Second, ctx.Done())
×
1269

×
1270
        go wait.Until(runWorker("add/update vpc nat gateway", c.addOrUpdateVpcNatGatewayQueue, c.handleAddOrUpdateVpcNatGw), time.Second, ctx.Done())
×
1271
        go wait.Until(runWorker("init vpc nat gateway", c.initVpcNatGatewayQueue, c.handleInitVpcNatGw), time.Second, ctx.Done())
×
1272
        go wait.Until(runWorker("delete vpc nat gateway", c.delVpcNatGatewayQueue, c.handleDelVpcNatGw), time.Second, ctx.Done())
×
1273
        go wait.Until(runWorker("add/update vpc egress gateway", c.addOrUpdateVpcEgressGatewayQueue, c.handleAddOrUpdateVpcEgressGateway), time.Second, ctx.Done())
×
1274
        go wait.Until(runWorker("delete vpc egress gateway", c.delVpcEgressGatewayQueue, c.handleDelVpcEgressGateway), time.Second, ctx.Done())
×
1275
        go wait.Until(runWorker("update fip for vpc nat gateway", c.updateVpcFloatingIPQueue, c.handleUpdateVpcFloatingIP), time.Second, ctx.Done())
×
1276
        go wait.Until(runWorker("update eip for vpc nat gateway", c.updateVpcEipQueue, c.handleUpdateVpcEip), time.Second, ctx.Done())
×
1277
        go wait.Until(runWorker("update dnat for vpc nat gateway", c.updateVpcDnatQueue, c.handleUpdateVpcDnat), time.Second, ctx.Done())
×
1278
        go wait.Until(runWorker("update snat for vpc nat gateway", c.updateVpcSnatQueue, c.handleUpdateVpcSnat), time.Second, ctx.Done())
×
1279
        go wait.Until(runWorker("update subnet route for vpc nat gateway", c.updateVpcSubnetQueue, c.handleUpdateNatGwSubnetRoute), time.Second, ctx.Done())
×
1280
        go wait.Until(runWorker("add/update csr", c.addOrUpdateCsrQueue, c.handleAddOrUpdateCsr), time.Second, ctx.Done())
×
1281
        // add default and join subnet and wait them ready
×
1282
        for range c.config.WorkerNum {
×
1283
                go wait.Until(runWorker("add/update subnet", c.addOrUpdateSubnetQueue, c.handleAddOrUpdateSubnet), time.Second, ctx.Done())
×
1284
        }
×
1285
        go wait.Until(runWorker("add/update ippool", c.addOrUpdateIPPoolQueue, c.handleAddOrUpdateIPPool), time.Second, ctx.Done())
×
1286
        go wait.Until(runWorker("add vlan", c.addVlanQueue, c.handleAddVlan), time.Second, ctx.Done())
×
1287
        go wait.Until(runWorker("add namespace", c.addNamespaceQueue, c.handleAddNamespace), time.Second, ctx.Done())
×
1288
        err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(_ context.Context) (done bool, err error) {
×
1289
                subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
×
1290
                klog.Infof("wait for subnets %v ready", subnets)
×
1291

×
1292
                return c.allSubnetReady(subnets...)
×
1293
        })
×
1294
        if err != nil {
×
UNCOV
1295
                klog.Fatalf("wait default and join subnet ready, error: %v", err)
×
1296
        }
×
1297

1298
        go wait.Until(runWorker("add/update security group", c.addOrUpdateSgQueue, func(key string) error { return c.handleAddOrUpdateSg(key, false) }), time.Second, ctx.Done())
×
1299
        go wait.Until(runWorker("delete security group", c.delSgQueue, c.handleDeleteSg), time.Second, ctx.Done())
×
1300
        go wait.Until(runWorker("ports for security group", c.syncSgPortsQueue, c.syncSgLogicalPort), time.Second, ctx.Done())
×
1301

×
1302
        // run node worker before handle any pods
×
1303
        for range c.config.WorkerNum {
×
1304
                go wait.Until(runWorker("add node", c.addNodeQueue, c.handleAddNode), time.Second, ctx.Done())
×
1305
                go wait.Until(runWorker("update node", c.updateNodeQueue, c.handleUpdateNode), time.Second, ctx.Done())
×
1306
                go wait.Until(runWorker("delete node", c.deleteNodeQueue, c.handleDeleteNode), time.Second, ctx.Done())
×
1307
        }
×
1308
        for {
×
1309
                ready := true
×
1310
                time.Sleep(3 * time.Second)
×
1311
                nodes, err := c.nodesLister.List(labels.Everything())
×
1312
                if err != nil {
×
1313
                        util.LogFatalAndExit(err, "failed to list nodes")
×
1314
                }
×
1315
                for _, node := range nodes {
×
1316
                        if node.Annotations[util.AllocatedAnnotation] != "true" {
×
1317
                                klog.Infof("wait node %s annotation ready", node.Name)
×
UNCOV
1318
                                ready = false
×
UNCOV
1319
                                break
×
1320
                        }
1321
                }
UNCOV
1322
                if ready {
×
UNCOV
1323
                        break
×
1324
                }
1325
        }
1326

1327
        if c.config.EnableLb {
×
1328
                go wait.Until(runWorker("add service", c.addServiceQueue, c.handleAddService), time.Second, ctx.Done())
×
1329
                // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
×
1330
                go wait.Until(runWorker("delete service", c.deleteServiceQueue, c.handleDeleteService), time.Second, ctx.Done())
×
1331

×
1332
                go wait.Until(runWorker("add/update switch lb rule", c.addSwitchLBRuleQueue, c.handleAddOrUpdateSwitchLBRule), time.Second, ctx.Done())
×
1333
                go wait.Until(runWorker("delete switch lb rule", c.delSwitchLBRuleQueue, c.handleDelSwitchLBRule), time.Second, ctx.Done())
×
1334
                go wait.Until(runWorker("delete switch lb rule", c.updateSwitchLBRuleQueue, c.handleUpdateSwitchLBRule), time.Second, ctx.Done())
×
1335

×
1336
                go wait.Until(runWorker("add/update vpc dns", c.addOrUpdateVpcDNSQueue, c.handleAddOrUpdateVPCDNS), time.Second, ctx.Done())
×
1337
                go wait.Until(runWorker("delete vpc dns", c.delVpcDNSQueue, c.handleDelVpcDNS), time.Second, ctx.Done())
×
1338
                go wait.Until(func() {
×
UNCOV
1339
                        c.resyncVpcDNSConfig()
×
UNCOV
1340
                }, 5*time.Second, ctx.Done())
×
1341
        }
1342

1343
        for range c.config.WorkerNum {
×
1344
                go wait.Until(runWorker("delete pod", c.deletePodQueue, c.handleDeletePod), time.Second, ctx.Done())
×
1345
                go wait.Until(runWorker("add/update pod", c.addOrUpdatePodQueue, c.handleAddOrUpdatePod), time.Second, ctx.Done())
×
1346
                go wait.Until(runWorker("update pod security", c.updatePodSecurityQueue, c.handleUpdatePodSecurity), time.Second, ctx.Done())
×
1347

×
1348
                go wait.Until(runWorker("delete subnet", c.deleteSubnetQueue, c.handleDeleteSubnet), time.Second, ctx.Done())
×
1349
                go wait.Until(runWorker("delete ippool", c.deleteIPPoolQueue, c.handleDeleteIPPool), time.Second, ctx.Done())
×
1350
                go wait.Until(runWorker("update status of subnet", c.updateSubnetStatusQueue, c.handleUpdateSubnetStatus), time.Second, ctx.Done())
×
1351
                go wait.Until(runWorker("update status of ippool", c.updateIPPoolStatusQueue, c.handleUpdateIPPoolStatus), time.Second, ctx.Done())
×
1352
                go wait.Until(runWorker("virtual port for subnet", c.syncVirtualPortsQueue, c.syncVirtualPort), time.Second, ctx.Done())
×
1353

×
1354
                if c.config.EnableLb {
×
1355
                        go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
×
UNCOV
1356
                        go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
×
1357
                }
×
1358

1359
                if c.config.EnableNP {
×
1360
                        go wait.Until(runWorker("update network policy", c.updateNpQueue, c.handleUpdateNp), time.Second, ctx.Done())
×
UNCOV
1361
                        go wait.Until(runWorker("delete network policy", c.deleteNpQueue, c.handleDeleteNp), time.Second, ctx.Done())
×
1362
                }
×
1363

UNCOV
1364
                go wait.Until(runWorker("delete vlan", c.delVlanQueue, c.handleDelVlan), time.Second, ctx.Done())
×
UNCOV
1365
                go wait.Until(runWorker("update vlan", c.updateVlanQueue, c.handleUpdateVlan), time.Second, ctx.Done())
×
1366
        }
1367

1368
        if c.config.EnableEipSnat {
×
1369
                go wait.Until(func() {
×
1370
                        // init l3 about the default vpc external lrp binding to the gw chassis
×
UNCOV
1371
                        c.resyncExternalGateway()
×
UNCOV
1372
                }, time.Second, ctx.Done())
×
1373

1374
                // maintain l3 ha about the vpc external lrp binding to the gw chassis
UNCOV
1375
                c.OVNNbClient.MonitorBFD()
×
1376
        }
1377
        // TODO: we should merge these two vpc nat config into one config and resync them together
1378
        go wait.Until(func() {
×
UNCOV
1379
                c.resyncVpcNatGwConfig()
×
1380
        }, time.Second, ctx.Done())
×
1381

1382
        go wait.Until(func() {
×
UNCOV
1383
                c.resyncVpcNatConfig()
×
1384
        }, time.Second, ctx.Done())
×
1385

1386
        if c.config.GCInterval != 0 {
×
1387
                go wait.Until(func() {
×
1388
                        if err := c.markAndCleanLSP(); err != nil {
×
UNCOV
1389
                                klog.Errorf("gc lsp error: %v", err)
×
UNCOV
1390
                        }
×
1391
                }, time.Duration(c.config.GCInterval)*time.Second, ctx.Done())
1392
        }
1393

1394
        go wait.Until(func() {
×
1395
                if err := c.inspectPod(); err != nil {
×
UNCOV
1396
                        klog.Errorf("inspection error: %v", err)
×
UNCOV
1397
                }
×
1398
        }, time.Duration(c.config.InspectInterval)*time.Second, ctx.Done())
1399

1400
        if c.config.EnableExternalVpc {
×
1401
                go wait.Until(func() {
×
UNCOV
1402
                        c.syncExternalVpc()
×
UNCOV
1403
                }, 5*time.Second, ctx.Done())
×
1404
        }
1405

1406
        go wait.Until(c.resyncProviderNetworkStatus, 30*time.Second, ctx.Done())
×
1407
        go wait.Until(c.exportSubnetMetrics, 30*time.Second, ctx.Done())
×
1408
        go wait.Until(c.checkSubnetGateway, 5*time.Second, ctx.Done())
×
1409
        go wait.Until(c.syncDistributedSubnetRoutes, 5*time.Second, ctx.Done())
×
1410

×
1411
        go wait.Until(runWorker("add ovn eip", c.addOvnEipQueue, c.handleAddOvnEip), time.Second, ctx.Done())
×
1412
        go wait.Until(runWorker("update ovn eip", c.updateOvnEipQueue, c.handleUpdateOvnEip), time.Second, ctx.Done())
×
1413
        go wait.Until(runWorker("reset ovn eip", c.resetOvnEipQueue, c.handleResetOvnEip), time.Second, ctx.Done())
×
1414
        go wait.Until(runWorker("delete ovn eip", c.delOvnEipQueue, c.handleDelOvnEip), time.Second, ctx.Done())
×
1415

×
1416
        go wait.Until(runWorker("add ovn fip", c.addOvnFipQueue, c.handleAddOvnFip), time.Second, ctx.Done())
×
1417
        go wait.Until(runWorker("update ovn fip", c.updateOvnFipQueue, c.handleUpdateOvnFip), time.Second, ctx.Done())
×
1418
        go wait.Until(runWorker("delete ovn fip", c.delOvnFipQueue, c.handleDelOvnFip), time.Second, ctx.Done())
×
1419

×
1420
        go wait.Until(runWorker("add ovn snat rule", c.addOvnSnatRuleQueue, c.handleAddOvnSnatRule), time.Second, ctx.Done())
×
1421
        go wait.Until(runWorker("update ovn snat rule", c.updateOvnSnatRuleQueue, c.handleUpdateOvnSnatRule), time.Second, ctx.Done())
×
1422
        go wait.Until(runWorker("delete ovn snat rule", c.delOvnSnatRuleQueue, c.handleDelOvnSnatRule), time.Second, ctx.Done())
×
1423

×
1424
        go wait.Until(runWorker("add ovn dnat", c.addOvnDnatRuleQueue, c.handleAddOvnDnatRule), time.Second, ctx.Done())
×
1425
        go wait.Until(runWorker("update ovn dnat", c.updateOvnDnatRuleQueue, c.handleUpdateOvnDnatRule), time.Second, ctx.Done())
×
1426
        go wait.Until(runWorker("delete ovn dnat", c.delOvnDnatRuleQueue, c.handleDelOvnDnatRule), time.Second, ctx.Done())
×
1427

×
1428
        go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
×
1429

×
1430
        go wait.Until(runWorker("add ip", c.addIPQueue, c.handleAddReservedIP), time.Second, ctx.Done())
×
1431
        go wait.Until(runWorker("update ip", c.updateIPQueue, c.handleUpdateIP), time.Second, ctx.Done())
×
1432
        go wait.Until(runWorker("delete ip", c.delIPQueue, c.handleDelIP), time.Second, ctx.Done())
×
1433

×
1434
        go wait.Until(runWorker("add vip", c.addVirtualIPQueue, c.handleAddVirtualIP), time.Second, ctx.Done())
×
1435
        go wait.Until(runWorker("update vip", c.updateVirtualIPQueue, c.handleUpdateVirtualIP), time.Second, ctx.Done())
×
1436
        go wait.Until(runWorker("update virtual parent for vip", c.updateVirtualParentsQueue, c.handleUpdateVirtualParents), time.Second, ctx.Done())
×
1437
        go wait.Until(runWorker("delete vip", c.delVirtualIPQueue, c.handleDelVirtualIP), time.Second, ctx.Done())
×
1438

×
1439
        go wait.Until(runWorker("add iptables eip", c.addIptablesEipQueue, c.handleAddIptablesEip), time.Second, ctx.Done())
×
1440
        go wait.Until(runWorker("update iptables eip", c.updateIptablesEipQueue, c.handleUpdateIptablesEip), time.Second, ctx.Done())
×
1441
        go wait.Until(runWorker("reset iptables eip", c.resetIptablesEipQueue, c.handleResetIptablesEip), time.Second, ctx.Done())
×
1442
        go wait.Until(runWorker("delete iptables eip", c.delIptablesEipQueue, c.handleDelIptablesEip), time.Second, ctx.Done())
×
1443

×
1444
        go wait.Until(runWorker("add iptables fip", c.addIptablesFipQueue, c.handleAddIptablesFip), time.Second, ctx.Done())
×
1445
        go wait.Until(runWorker("update iptables fip", c.updateIptablesFipQueue, c.handleUpdateIptablesFip), time.Second, ctx.Done())
×
1446
        go wait.Until(runWorker("delete iptables fip", c.delIptablesFipQueue, c.handleDelIptablesFip), time.Second, ctx.Done())
×
1447

×
1448
        go wait.Until(runWorker("add iptables dnat rule", c.addIptablesDnatRuleQueue, c.handleAddIptablesDnatRule), time.Second, ctx.Done())
×
1449
        go wait.Until(runWorker("update iptables dnat rule", c.updateIptablesDnatRuleQueue, c.handleUpdateIptablesDnatRule), time.Second, ctx.Done())
×
1450
        go wait.Until(runWorker("delete iptables dnat rule", c.delIptablesDnatRuleQueue, c.handleDelIptablesDnatRule), time.Second, ctx.Done())
×
1451

×
1452
        go wait.Until(runWorker("add iptables snat rule", c.addIptablesSnatRuleQueue, c.handleAddIptablesSnatRule), time.Second, ctx.Done())
×
1453
        go wait.Until(runWorker("update iptables snat rule", c.updateIptablesSnatRuleQueue, c.handleUpdateIptablesSnatRule), time.Second, ctx.Done())
×
1454
        go wait.Until(runWorker("delete iptables snat rule", c.delIptablesSnatRuleQueue, c.handleDelIptablesSnatRule), time.Second, ctx.Done())
×
1455

×
1456
        go wait.Until(runWorker("add qos policy", c.addQoSPolicyQueue, c.handleAddQoSPolicy), time.Second, ctx.Done())
×
1457
        go wait.Until(runWorker("update qos policy", c.updateQoSPolicyQueue, c.handleUpdateQoSPolicy), time.Second, ctx.Done())
×
1458
        go wait.Until(runWorker("delete qos policy", c.delQoSPolicyQueue, c.handleDelQoSPolicy), time.Second, ctx.Done())
×
1459

×
1460
        if c.config.EnableANP {
×
1461
                go wait.Until(runWorker("add admin network policy", c.addAnpQueue, c.handleAddAnp), time.Second, ctx.Done())
×
1462
                go wait.Until(runWorker("update admin network policy", c.updateAnpQueue, c.handleUpdateAnp), time.Second, ctx.Done())
×
1463
                go wait.Until(runWorker("delete admin network policy", c.deleteAnpQueue, c.handleDeleteAnp), time.Second, ctx.Done())
×
1464

×
1465
                go wait.Until(runWorker("add base admin network policy", c.addBanpQueue, c.handleAddBanp), time.Second, ctx.Done())
×
1466
                go wait.Until(runWorker("update base admin network policy", c.updateBanpQueue, c.handleUpdateBanp), time.Second, ctx.Done())
×
1467
                go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done())
×
1468

×
1469
                go wait.Until(runWorker("add cluster network policy", c.addCnpQueue, c.handleAddCnp), time.Second, ctx.Done())
×
1470
                go wait.Until(runWorker("update cluster network policy", c.updateCnpQueue, c.handleUpdateCnp), time.Second, ctx.Done())
×
UNCOV
1471
                go wait.Until(runWorker("delete cluster network policy", c.deleteCnpQueue, c.handleDeleteCnp), time.Second, ctx.Done())
×
1472
        }
×
1473

1474
        if c.config.EnableDNSNameResolver {
×
1475
                go wait.Until(runWorker("add or update dns name resolver", c.addOrUpdateDNSNameResolverQueue, c.handleAddOrUpdateDNSNameResolver), time.Second, ctx.Done())
×
UNCOV
1476
                go wait.Until(runWorker("delete dns name resolver", c.deleteDNSNameResolverQueue, c.handleDeleteDNSNameResolver), time.Second, ctx.Done())
×
1477
        }
×
1478

1479
        if c.config.EnableLiveMigrationOptimize {
×
UNCOV
1480
                go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
×
1481
        }
×
1482

1483
        go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
×
UNCOV
1484

×
UNCOV
1485
        go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
×
1486
}
1487

1488
func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
2✔
1489
        for _, lsName := range subnets {
4✔
1490
                exist, err := c.OVNNbClient.LogicalSwitchExists(lsName)
2✔
1491
                if err != nil {
2✔
1492
                        klog.Error(err)
×
UNCOV
1493
                        return false, fmt.Errorf("check logical switch %s exist: %w", lsName, err)
×
UNCOV
1494
                }
×
1495

1496
                if !exist {
4✔
1497
                        return false, nil
2✔
1498
                }
2✔
1499
        }
1500

1501
        return true, nil
2✔
1502
}
1503

1504
func (c *Controller) initResourceOnce() {
×
1505
        c.registerSubnetMetrics()
×
1506

×
1507
        if err := c.initNodeChassis(); err != nil {
×
UNCOV
1508
                util.LogFatalAndExit(err, "failed to initialize node chassis")
×
1509
        }
×
1510

1511
        if err := c.initDefaultDenyAllSecurityGroup(); err != nil {
×
1512
                util.LogFatalAndExit(err, "failed to initialize 'deny_all' security group")
×
1513
        }
×
1514
        if err := c.syncSecurityGroup(); err != nil {
×
UNCOV
1515
                util.LogFatalAndExit(err, "failed to sync security group")
×
1516
        }
×
1517

1518
        if err := c.syncVpcNatGatewayCR(); err != nil {
×
UNCOV
1519
                util.LogFatalAndExit(err, "failed to sync crd vpc nat gateways")
×
1520
        }
×
1521

1522
        if err := c.initVpcNatGw(); err != nil {
×
1523
                util.LogFatalAndExit(err, "failed to initialize vpc nat gateways")
×
1524
        }
×
1525
        if c.config.EnableLb {
×
1526
                if err := c.initVpcDNSConfig(); err != nil {
×
UNCOV
1527
                        util.LogFatalAndExit(err, "failed to initialize vpc-dns")
×
UNCOV
1528
                }
×
1529
        }
1530

1531
        // remove resources in ovndb that not exist any more in kubernetes resources
1532
        // process gc at last in case of affecting other init process
1533
        if err := c.gc(); err != nil {
×
UNCOV
1534
                util.LogFatalAndExit(err, "failed to run gc")
×
UNCOV
1535
        }
×
1536
}
1537

1538
func processNextWorkItem[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error, getItemKey func(any) string) bool {
×
1539
        item, shutdown := queue.Get()
×
1540
        if shutdown {
×
UNCOV
1541
                return false
×
1542
        }
×
1543

1544
        err := func(item T) error {
×
1545
                defer queue.Done(item)
×
1546
                if err := handler(item); err != nil {
×
1547
                        queue.AddRateLimited(item)
×
1548
                        return fmt.Errorf("error syncing %s %q: %w, requeuing", action, getItemKey(item), err)
×
1549
                }
×
UNCOV
1550
                queue.Forget(item)
×
1551
                return nil
×
1552
        }(item)
1553
        if err != nil {
×
1554
                utilruntime.HandleError(err)
×
1555
                return true
×
UNCOV
1556
        }
×
UNCOV
1557
        return true
×
1558
}
1559

1560
func getWorkItemKey(obj any) string {
×
1561
        switch v := obj.(type) {
×
1562
        case string:
×
1563
                return v
×
1564
        case *vpcService:
×
1565
                return cache.MetaObjectToName(obj.(*vpcService).Svc).String()
×
1566
        case *AdminNetworkPolicyChangedDelta:
×
1567
                return v.key
×
1568
        case *SlrInfo:
×
1569
                return v.Name
×
1570
        default:
×
1571
                key, err := cache.MetaNamespaceKeyFunc(obj)
×
1572
                if err != nil {
×
1573
                        utilruntime.HandleError(err)
×
1574
                        return ""
×
UNCOV
1575
                }
×
UNCOV
1576
                return key
×
1577
        }
1578
}
1579

1580
func runWorker[T comparable](action string, queue workqueue.TypedRateLimitingInterface[T], handler func(T) error) func() {
×
1581
        return func() {
×
UNCOV
1582
                for processNextWorkItem(action, queue, handler, getWorkItemKey) {
×
UNCOV
1583
                }
×
1584
        }
1585
}
1586

1587
// apiResourceExists checks if all specified kinds exist in the given group version.
1588
// It returns true if all kinds are found, false otherwise.
1589
// Parameters:
1590
// - discoveryClient: The discovery client to use for querying API resources.
1591
// - gv: The group version string (e.g., "apps/v1").
1592
// - kinds: A variadic list of kind names to check for existence (e.g., "Deployment", "StatefulSet").
1593
func apiResourceExists(discoveryClient discovery.DiscoveryInterface, gv string, kinds ...string) (bool, error) {
×
1594
        apiResourceLists, err := discoveryClient.ServerResourcesForGroupVersion(gv)
×
1595
        if err != nil {
×
1596
                if k8serrors.IsNotFound(err) {
×
1597
                        return false, nil
×
UNCOV
1598
                }
×
UNCOV
1599
                return false, fmt.Errorf("failed to discover api resources for %s: %w", gv, err)
×
1600
        }
1601

1602
        existingKinds := set.New[string]()
×
1603
        for _, apiResource := range apiResourceLists.APIResources {
×
UNCOV
1604
                existingKinds.Insert(apiResource.Kind)
×
1605
        }
×
1606

UNCOV
1607
        return existingKinds.HasAll(kinds...), nil
×
1608
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc