• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 15585629803

11 Jun 2025 01:02PM UTC coverage: 21.756%. First build
15585629803

Pull #5348

github

zhangzujian
fix vpc egress gateway not applied to new pods

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
Pull Request #5348: fix vpc egress gateway not applied to new pods

0 of 11 new or added lines in 1 file covered. (0.0%)

10427 of 47927 relevant lines covered (21.76%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/controller/vpc_egress_gateway.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "maps"
7
        "reflect"
8
        "slices"
9
        "strconv"
10
        "strings"
11

12
        nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
13
        appsv1 "k8s.io/api/apps/v1"
14
        corev1 "k8s.io/api/core/v1"
15
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/labels"
18
        "k8s.io/apimachinery/pkg/util/intstr"
19
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20
        "k8s.io/client-go/tools/cache"
21
        "k8s.io/klog/v2"
22
        "k8s.io/utils/ptr"
23
        "k8s.io/utils/set"
24
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
25

26
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
27
        "github.com/kubeovn/kube-ovn/pkg/ovs"
28
        "github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
29
        "github.com/kubeovn/kube-ovn/pkg/util"
30
)
31

32
func (c *Controller) enqueueAddVpcEgressGateway(obj any) {
×
33
        key := cache.MetaObjectToName(obj.(*kubeovnv1.VpcEgressGateway)).String()
×
34
        klog.V(3).Infof("enqueue add vpc-egress-gateway %s", key)
×
35
        c.addOrUpdateVpcEgressGatewayQueue.Add(key)
×
36
}
×
37

38
func (c *Controller) enqueueUpdateVpcEgressGateway(_, newObj any) {
×
39
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.VpcEgressGateway)).String()
×
40
        klog.V(3).Infof("enqueue update vpc-egress-gateway %s", key)
×
41
        c.addOrUpdateVpcEgressGatewayQueue.Add(key)
×
42
}
×
43

44
func (c *Controller) enqueueDeleteVpcEgressGateway(obj any) {
×
45
        key := cache.MetaObjectToName(obj.(*kubeovnv1.VpcEgressGateway)).String()
×
46
        klog.V(3).Infof("enqueue delete vpc-egress-gateway %s", key)
×
47
        c.delVpcEgressGatewayQueue.Add(key)
×
48
}
×
49

50
func vegWorkloadLabels(vegName string) map[string]string {
×
51
        return map[string]string{"app": "vpc-egress-gateway", util.VpcEgressGatewayLabel: vegName}
×
52
}
×
53

54
func (c *Controller) handleAddOrUpdateVpcEgressGateway(key string) error {
×
55
        ns, name, err := cache.SplitMetaNamespaceKey(key)
×
56
        if err != nil {
×
57
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
58
                return nil
×
59
        }
×
60

61
        c.vpcEgressGatewayKeyMutex.LockKey(key)
×
62
        defer func() { _ = c.vpcEgressGatewayKeyMutex.UnlockKey(key) }()
×
63

64
        cachedGateway, err := c.vpcEgressGatewayLister.VpcEgressGateways(ns).Get(name)
×
65
        if err != nil {
×
66
                if !k8serrors.IsNotFound(err) {
×
67
                        klog.Error(err)
×
68
                        return err
×
69
                }
×
70
                return nil
×
71
        }
72

73
        if !cachedGateway.DeletionTimestamp.IsZero() {
×
74
                c.delVpcEgressGatewayQueue.Add(key)
×
75
                return nil
×
76
        }
×
77

78
        klog.Infof("reconciling vpc-egress-gateway %s", key)
×
79
        gw := cachedGateway.DeepCopy()
×
80
        if gw, err = c.initVpcEgressGatewayStatus(gw); err != nil {
×
81
                return err
×
82
        }
×
83

84
        vpcName := gw.Spec.VPC
×
85
        if vpcName == "" {
×
86
                vpcName = c.config.ClusterRouter
×
87
        }
×
88
        vpc, err := c.vpcsLister.Get(vpcName)
×
89
        if err != nil {
×
90
                klog.Error(err)
×
91
                return err
×
92
        }
×
93
        if gw.Spec.BFD.Enabled && vpc.Status.BFDPort.IP == "" {
×
94
                err = fmt.Errorf("vpc %s bfd port is not enabled or not ready", vpc.Name)
×
95
                klog.Error(err)
×
96
                gw.Status.Conditions.SetCondition(kubeovnv1.Validated, corev1.ConditionFalse, "VpcBfdPortNotEnabled", err.Error(), gw.Generation)
×
97
                _, _ = c.updateVpcEgressGatewayStatus(gw)
×
98
                return err
×
99
        }
×
100

101
        if controllerutil.AddFinalizer(gw, util.KubeOVNControllerFinalizer) {
×
102
                updatedGateway, err := c.config.KubeOvnClient.KubeovnV1().VpcEgressGateways(gw.Namespace).
×
103
                        Update(context.Background(), gw, metav1.UpdateOptions{})
×
104
                if err != nil {
×
105
                        err = fmt.Errorf("failed to add finalizer for vpc-egress-gateway %s/%s: %w", gw.Namespace, gw.Name, err)
×
106
                        klog.Error(err)
×
107
                        return err
×
108
                }
×
109
                gw = updatedGateway
×
110
        }
111

112
        var bfdIP, bfdIPv4, bfdIPv6 string
×
113
        if gw.Spec.BFD.Enabled {
×
114
                bfdIP = vpc.Status.BFDPort.IP
×
115
                bfdIPv4, bfdIPv6 = util.SplitStringIP(bfdIP)
×
116
        }
×
117

118
        // reconcile the vpc egress gateway workload and get the route sources for later OVN resources reconciliation
119
        attachmentNetworkName, ipv4Src, ipv6Src, deploy, err := c.reconcileVpcEgressGatewayWorkload(gw, vpc, bfdIP, bfdIPv4, bfdIPv6)
×
120
        gw.Status.Replicas = gw.Spec.Replicas
×
121
        gw.Status.LabelSelector = labels.FormatLabels(vegWorkloadLabels(gw.Name))
×
122
        if err != nil {
×
123
                klog.Error(err)
×
124
                gw.Status.Replicas = 0
×
125
                gw.Status.Conditions.SetCondition(kubeovnv1.Ready, corev1.ConditionFalse, "ReconcileWorkloadFailed", err.Error(), gw.Generation)
×
126
                _, _ = c.updateVpcEgressGatewayStatus(gw)
×
127
                return err
×
128
        }
×
129

130
        gw.Status.InternalIPs = nil
×
131
        gw.Status.ExternalIPs = nil
×
132
        gw.Status.Workload.APIVersion = deploy.APIVersion
×
133
        gw.Status.Workload.Kind = deploy.Kind
×
134
        gw.Status.Workload.Name = deploy.Name
×
135
        gw.Status.Workload.Nodes = nil
×
136
        nodeNexthopIPv4 := make(map[string]string, int(gw.Spec.Replicas))
×
137
        nodeNexthopIPv6 := make(map[string]string, int(gw.Spec.Replicas))
×
138
        if !util.DeploymentIsReady(deploy) {
×
139
                gw.Status.Ready = false
×
140
                msg := fmt.Sprintf("Waiting for %s %s to be ready", deploy.Kind, deploy.Name)
×
141
                gw.Status.Conditions.SetCondition(kubeovnv1.Ready, corev1.ConditionFalse, "Processing", msg, gw.Generation)
×
142
        } else {
×
143
                // get the pods of the deployment to collect the pod IPs
×
144
                podSelector, err := metav1.LabelSelectorAsSelector(deploy.Spec.Selector)
×
145
                if err != nil {
×
146
                        err = fmt.Errorf("failed to get pod selector of deployment %s/%s: %w", deploy.Namespace, deploy.Name, err)
×
147
                        klog.Error(err)
×
148
                        return err
×
149
                }
×
150

151
                pods, err := c.podsLister.Pods(deploy.Namespace).List(podSelector)
×
152
                if err != nil {
×
153
                        err = fmt.Errorf("failed to list pods of deployment %s/%s: %w", deploy.Namespace, deploy.Name, err)
×
154
                        klog.Error(err)
×
155
                        return err
×
156
                }
×
157

158
                // update gateway status including the internal/external IPs and the nodes where the pods are running
159
                gw.Status.Workload.Nodes = make([]string, 0, len(pods))
×
160
                for _, pod := range pods {
×
161
                        gw.Status.Workload.Nodes = append(gw.Status.Workload.Nodes, pod.Spec.NodeName)
×
162
                        ips := util.PodIPs(*pod)
×
163
                        ipv4, ipv6 := util.SplitIpsByProtocol(ips)
×
164
                        if len(ipv4) != 0 {
×
165
                                nodeNexthopIPv4[pod.Spec.NodeName] = ipv4[0]
×
166
                        }
×
167
                        if len(ipv6) != 0 {
×
168
                                nodeNexthopIPv6[pod.Spec.NodeName] = ipv6[0]
×
169
                        }
×
170
                        gw.Status.InternalIPs = append(gw.Status.InternalIPs, strings.Join(ips, ","))
×
171
                        extIPs, err := util.PodAttachmentIPs(pod, attachmentNetworkName)
×
172
                        if err != nil {
×
173
                                klog.Error(err)
×
174
                                gw.Status.ExternalIPs = append(gw.Status.ExternalIPs, "<unknown>")
×
175
                                continue
×
176
                        }
177
                        gw.Status.ExternalIPs = append(gw.Status.ExternalIPs, strings.Join(extIPs, ","))
×
178
                }
179
        }
180
        if gw, err = c.updateVpcEgressGatewayStatus(gw); err != nil {
×
181
                klog.Error(err)
×
182
                return err
×
183
        }
×
184
        if len(gw.Status.Workload.Nodes) == 0 {
×
185
                // the workload is not ready yet
×
186
                return nil
×
187
        }
×
188

189
        // reconcile OVN routes
190
        if err = c.reconcileVpcEgressGatewayOVNRoutes(gw, 4, vpc.Status.Router, vpc.Status.BFDPort.Name, bfdIPv4, nodeNexthopIPv4, ipv4Src); err != nil {
×
191
                klog.Error(err)
×
192
                return err
×
193
        }
×
194
        if err = c.reconcileVpcEgressGatewayOVNRoutes(gw, 6, vpc.Status.Router, vpc.Status.BFDPort.Name, bfdIPv6, nodeNexthopIPv6, ipv6Src); err != nil {
×
195
                klog.Error(err)
×
196
                return err
×
197
        }
×
198

199
        gw.Status.Ready = true
×
200
        gw.Status.Phase = kubeovnv1.PhaseCompleted
×
201
        gw.Status.Conditions.SetReady("ReconcileSuccess", gw.Generation)
×
202
        if _, err = c.updateVpcEgressGatewayStatus(gw); err != nil {
×
203
                return err
×
204
        }
×
205

206
        return nil
×
207
}
208

209
func (c *Controller) initVpcEgressGatewayStatus(gw *kubeovnv1.VpcEgressGateway) (*kubeovnv1.VpcEgressGateway, error) {
×
210
        var err error
×
211
        if gw.Status.Phase == "" || gw.Status.Phase == kubeovnv1.PhasePending {
×
212
                gw.Status.Phase = kubeovnv1.PhaseProcessing
×
213
                gw, err = c.updateVpcEgressGatewayStatus(gw)
×
214
        }
×
215
        return gw, err
×
216
}
217

218
func (c *Controller) updateVpcEgressGatewayStatus(gw *kubeovnv1.VpcEgressGateway) (*kubeovnv1.VpcEgressGateway, error) {
×
219
        if len(gw.Status.Conditions) == 0 {
×
220
                gw.Status.Conditions.SetCondition(kubeovnv1.Init, corev1.ConditionUnknown, "Processing", "", gw.Generation)
×
221
        }
×
222
        if !gw.Status.Ready {
×
223
                gw.Status.Phase = kubeovnv1.PhaseProcessing
×
224
        }
×
225

226
        updateGateway, err := c.config.KubeOvnClient.KubeovnV1().VpcEgressGateways(gw.Namespace).
×
227
                UpdateStatus(context.Background(), gw, metav1.UpdateOptions{})
×
228
        if err != nil {
×
229
                err = fmt.Errorf("failed to update status of vpc-egress-gateway %s/%s: %w", gw.Namespace, gw.Name, err)
×
230
                klog.Error(err)
×
231
                return nil, err
×
232
        }
×
233

234
        return updateGateway, nil
×
235
}
236

237
// create or update vpc egress gateway workload
238
func (c *Controller) reconcileVpcEgressGatewayWorkload(gw *kubeovnv1.VpcEgressGateway, vpc *kubeovnv1.Vpc, bfdIP, bfdIPv4, bfdIPv6 string) (string, set.Set[string], set.Set[string], *appsv1.Deployment, error) {
×
239
        image := c.config.Image
×
240
        if gw.Spec.Image != "" {
×
241
                image = gw.Spec.Image
×
242
        }
×
243
        if image == "" {
×
244
                err := fmt.Errorf("no image specified for vpc egress gateway %s/%s", gw.Namespace, gw.Name)
×
245
                klog.Error(err)
×
246
                return "", nil, nil, nil, err
×
247
        }
×
248

249
        if len(gw.Spec.InternalIPs) != 0 && len(gw.Spec.InternalIPs) < int(gw.Spec.Replicas) {
×
250
                err := fmt.Errorf("internal IPs count %d is less than replicas %d", len(gw.Spec.InternalIPs), gw.Spec.Replicas)
×
251
                klog.Error(err)
×
252
                return "", nil, nil, nil, err
×
253
        }
×
254
        if len(gw.Spec.ExternalIPs) != 0 && len(gw.Spec.ExternalIPs) < int(gw.Spec.Replicas) {
×
255
                err := fmt.Errorf("external IPs count %d is less than replicas %d", len(gw.Spec.ExternalIPs), gw.Spec.Replicas)
×
256
                klog.Error(err)
×
257
                return "", nil, nil, nil, err
×
258
        }
×
259

260
        internalSubnet := gw.Spec.InternalSubnet
×
261
        if internalSubnet == "" {
×
262
                internalSubnet = vpc.Status.DefaultLogicalSwitch
×
263
        }
×
264
        if internalSubnet == "" {
×
265
                err := fmt.Errorf("default subnet of vpc %s not found, please set internal subnet of the egress gateway", vpc.Name)
×
266
                klog.Error(err)
×
267
                return "", nil, nil, nil, err
×
268
        }
×
269
        intSubnet, err := c.subnetsLister.Get(internalSubnet)
×
270
        if err != nil {
×
271
                klog.Error(err)
×
272
                return "", nil, nil, nil, err
×
273
        }
×
274
        extSubnet, err := c.subnetsLister.Get(gw.Spec.ExternalSubnet)
×
275
        if err != nil {
×
276
                klog.Error(err)
×
277
                return "", nil, nil, nil, err
×
278
        }
×
279
        if !strings.ContainsRune(extSubnet.Spec.Provider, '.') {
×
280
                err = fmt.Errorf("please set correct provider of subnet %s to get the network-attachment-definition", extSubnet.Name)
×
281
                klog.Error(err)
×
282
                return "", nil, nil, nil, err
×
283
        }
×
284
        subStrings := strings.Split(extSubnet.Spec.Provider, ".")
×
285
        nadName, nadNamespace := subStrings[0], subStrings[1]
×
286
        if _, err = c.config.AttachNetClient.K8sCniCncfIoV1().NetworkAttachmentDefinitions(nadNamespace).
×
287
                Get(context.Background(), nadName, metav1.GetOptions{}); err != nil {
×
288
                klog.Errorf("failed to get net-attach-def %s/%s: %v", nadNamespace, nadName, err)
×
289
                return "", nil, nil, nil, err
×
290
        }
×
291
        attachmentNetworkName := fmt.Sprintf("%s/%s", nadNamespace, nadName)
×
292

×
293
        // collect egress policies
×
294
        ipv4ForwardSrc, ipv6ForwardSrc := set.New[string](), set.New[string]()
×
295
        ipv4SNATSrc, ipv6SNATSrc := set.New[string](), set.New[string]()
×
296
        for _, policy := range gw.Spec.Policies {
×
297
                ipv4, ipv6 := util.SplitIpsByProtocol(policy.IPBlocks)
×
298
                if policy.SNAT {
×
299
                        ipv4SNATSrc.Insert(ipv4...)
×
300
                        ipv6SNATSrc.Insert(ipv6...)
×
301
                } else {
×
302
                        ipv4ForwardSrc.Insert(ipv4...)
×
303
                        ipv6ForwardSrc.Insert(ipv6...)
×
304
                }
×
305
                for _, subnetName := range policy.Subnets {
×
306
                        subnet, err := c.subnetsLister.Get(subnetName)
×
307
                        if err != nil {
×
308
                                klog.Error(err)
×
309
                                return attachmentNetworkName, nil, nil, nil, err
×
310
                        }
×
311
                        if subnet.Status.IsNotValidated() {
×
312
                                err = fmt.Errorf("subnet %s is not validated", subnet.Name)
×
313
                                klog.Error(err)
×
314
                                return attachmentNetworkName, nil, nil, nil, err
×
315
                        }
×
316
                        // TODO: check subnet's vpc and vlan
317
                        ipv4, ipv6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
318
                        if policy.SNAT {
×
319
                                ipv4SNATSrc.Insert(ipv4)
×
320
                                ipv6SNATSrc.Insert(ipv6)
×
321
                        } else {
×
322
                                ipv4ForwardSrc.Insert(ipv4)
×
323
                                ipv6ForwardSrc.Insert(ipv6)
×
324
                        }
×
325
                }
326
        }
327

328
        // calculate internal route destinations and forward source CIDR blocks
329
        intRouteDstIPv4, intRouteDstIPv6 := ipv4ForwardSrc.Union(ipv4SNATSrc), ipv6ForwardSrc.Union(ipv6SNATSrc)
×
330
        intRouteDstIPv4.Delete("")
×
331
        intRouteDstIPv6.Delete("")
×
332
        ipv4ForwardSrc.Delete("")
×
333
        ipv6ForwardSrc.Delete("")
×
334

×
335
        // generate route annotations used to configure routes in the pod
×
336
        routes := util.NewPodRoutes()
×
337
        intGatewayIPv4, intGatewayIPv6 := util.SplitStringIP(intSubnet.Spec.Gateway)
×
338
        extGatewayIPv4, extGatewayIPv6 := util.SplitStringIP(extSubnet.Spec.Gateway)
×
339
        // add routes for the VPC BFD Port so that the egress gateway can establish BFD session(s) with it
×
340
        routes.Add(util.OvnProvider, bfdIPv4, intGatewayIPv4)
×
341
        routes.Add(util.OvnProvider, bfdIPv6, intGatewayIPv6)
×
342
        // add routes for the internal networks
×
343
        for _, dst := range intRouteDstIPv4.UnsortedList() {
×
344
                routes.Add(util.OvnProvider, dst, intGatewayIPv4)
×
345
        }
×
346
        for _, dst := range intRouteDstIPv6.UnsortedList() {
×
347
                routes.Add(util.OvnProvider, dst, intGatewayIPv6)
×
348
        }
×
349
        // add default routes to forward traffic to the external network
350
        routes.Add(extSubnet.Spec.Provider, "0.0.0.0/0", extGatewayIPv4)
×
351
        routes.Add(extSubnet.Spec.Provider, "::/0", extGatewayIPv6)
×
352

×
353
        // generate pod annotations
×
354
        annotations, err := routes.ToAnnotations()
×
355
        if err != nil {
×
356
                klog.Error(err)
×
357
                return attachmentNetworkName, nil, nil, nil, err
×
358
        }
×
359
        annotations[nadv1.NetworkAttachmentAnnot] = attachmentNetworkName
×
360
        annotations[util.LogicalSwitchAnnotation] = intSubnet.Name
×
361
        if len(gw.Spec.InternalIPs) != 0 {
×
362
                // set internal IPs
×
363
                annotations[util.IPPoolAnnotation] = strings.Join(gw.Spec.InternalIPs, ";")
×
364
        }
×
365
        if len(gw.Spec.ExternalIPs) != 0 {
×
366
                // set external IPs
×
367
                annotations[fmt.Sprintf(util.IPPoolAnnotationTemplate, extSubnet.Spec.Provider)] = strings.Join(gw.Spec.ExternalIPs, ";")
×
368
        }
×
369

370
        // generate init container environment variables
371
        // the init container is responsible for adding routes and SNAT rules to the pod network namespace
372
        initEnv, err := vpcEgressGatewayInitContainerEnv(4, intGatewayIPv4, extGatewayIPv4, ipv4ForwardSrc)
×
373
        if err != nil {
×
374
                klog.Error(err)
×
375
                return attachmentNetworkName, nil, nil, nil, err
×
376
        }
×
377
        ipv6Env, err := vpcEgressGatewayInitContainerEnv(6, intGatewayIPv6, extGatewayIPv6, ipv6ForwardSrc)
×
378
        if err != nil {
×
379
                klog.Error(err)
×
380
                return attachmentNetworkName, nil, nil, nil, err
×
381
        }
×
382
        initEnv = append(initEnv, ipv6Env...)
×
383

×
384
        // generate workload
×
385
        labels := vegWorkloadLabels(gw.Name)
×
386
        deploy := &appsv1.Deployment{
×
387
                ObjectMeta: metav1.ObjectMeta{
×
388
                        Name:      gw.Spec.Prefix + gw.Name,
×
389
                        Namespace: gw.Namespace,
×
390
                        Labels:    labels,
×
391
                },
×
392
                Spec: appsv1.DeploymentSpec{
×
393
                        Selector: &metav1.LabelSelector{
×
394
                                MatchLabels: labels,
×
395
                        },
×
396
                        Strategy: appsv1.DeploymentStrategy{
×
397
                                Type: appsv1.RollingUpdateDeploymentStrategyType,
×
398
                                RollingUpdate: &appsv1.RollingUpdateDeployment{
×
399
                                        MaxUnavailable: ptr.To(intstr.FromInt(1)),
×
400
                                        MaxSurge:       ptr.To(intstr.FromInt(0)),
×
401
                                },
×
402
                        },
×
403
                        Template: corev1.PodTemplateSpec{
×
404
                                ObjectMeta: metav1.ObjectMeta{
×
405
                                        Labels:      labels,
×
406
                                        Annotations: annotations,
×
407
                                },
×
408
                                Spec: corev1.PodSpec{
×
409
                                        Affinity: &corev1.Affinity{
×
410
                                                NodeAffinity: &corev1.NodeAffinity{
×
411
                                                        RequiredDuringSchedulingIgnoredDuringExecution: mergeNodeSelector(gw.Spec.NodeSelector),
×
412
                                                },
×
413
                                                PodAntiAffinity: &corev1.PodAntiAffinity{
×
414
                                                        RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
×
415
                                                                LabelSelector: &metav1.LabelSelector{
×
416
                                                                        MatchLabels: labels,
×
417
                                                                },
×
418
                                                                TopologyKey: corev1.LabelHostname,
×
419
                                                        }},
×
420
                                                },
×
421
                                        },
×
422
                                        InitContainers: []corev1.Container{{
×
423
                                                Name:            "init",
×
424
                                                Image:           image,
×
425
                                                ImagePullPolicy: corev1.PullIfNotPresent,
×
426
                                                Command:         []string{"bash", "/kube-ovn/init-vpc-egress-gateway.sh"},
×
427
                                                Env:             initEnv,
×
428
                                                SecurityContext: &corev1.SecurityContext{
×
429
                                                        Privileged: ptr.To(true),
×
430
                                                },
×
431
                                                VolumeMounts: []corev1.VolumeMount{{
×
432
                                                        Name:      "usr-local-sbin",
×
433
                                                        MountPath: "/usr/local/sbin",
×
434
                                                }},
×
435
                                        }},
×
436
                                        Containers: []corev1.Container{{
×
437
                                                Name:            "gateway",
×
438
                                                Image:           image,
×
439
                                                ImagePullPolicy: corev1.PullIfNotPresent,
×
440
                                                Command:         []string{"sleep", "infinity"},
×
441
                                                SecurityContext: &corev1.SecurityContext{
×
442
                                                        Privileged: ptr.To(false),
×
443
                                                        RunAsUser:  ptr.To[int64](65534),
×
444
                                                        Capabilities: &corev1.Capabilities{
×
445
                                                                Add:  []corev1.Capability{"NET_ADMIN", "NET_RAW"},
×
446
                                                                Drop: []corev1.Capability{"ALL"},
×
447
                                                        },
×
448
                                                },
×
449
                                                VolumeMounts: []corev1.VolumeMount{{
×
450
                                                        Name:      "usr-local-sbin",
×
451
                                                        MountPath: "/usr/local/sbin",
×
452
                                                }},
×
453
                                        }},
×
454
                                        Volumes: []corev1.Volume{{
×
455
                                                Name: "usr-local-sbin",
×
456
                                                VolumeSource: corev1.VolumeSource{
×
457
                                                        EmptyDir: &corev1.EmptyDirVolumeSource{},
×
458
                                                },
×
459
                                        }},
×
460
                                        TerminationGracePeriodSeconds: ptr.To[int64](0),
×
461
                                },
×
462
                        },
×
463
                },
×
464
        }
×
465
        // set owner reference so that the workload will be deleted automatically when the vpc egress gateway is deleted
×
466
        if err = util.SetOwnerReference(gw, deploy); err != nil {
×
467
                klog.Error(err)
×
468
                return attachmentNetworkName, nil, nil, nil, err
×
469
        }
×
470

471
        if bfdIP != "" {
×
472
                // run BFD in the gateway container        to establish BFD session(s) with the VPC BFD LRP
×
473
                container := vpcEgressGatewayContainerBFDD(image, bfdIP, gw.Spec.BFD.MinTX, gw.Spec.BFD.MinRX, gw.Spec.BFD.Multiplier)
×
474
                deploy.Spec.Template.Spec.Containers[0] = container
×
475
        }
×
476

477
        // generate hash for the workload to determine whether to update the existing workload or not
478
        hash, err := util.Sha256HashObject(deploy)
×
479
        if err != nil {
×
480
                err = fmt.Errorf("failed to hash generated deployment %s/%s: %w", deploy.Namespace, deploy.Name, err)
×
481
                klog.Error(err)
×
482
                return attachmentNetworkName, nil, nil, nil, err
×
483
        }
×
484

485
        hash = hash[:12]
×
486
        // replicas and the hash annotation should be excluded from hash calculation
×
487
        deploy.Spec.Replicas = ptr.To(gw.Spec.Replicas)
×
488
        deploy.Annotations = map[string]string{util.GenerateHashAnnotation: hash}
×
489
        if currentDeploy, err := c.deploymentsLister.Deployments(gw.Namespace).Get(deploy.Name); err != nil {
×
490
                if !k8serrors.IsNotFound(err) {
×
491
                        err = fmt.Errorf("failed to get deployment %s/%s: %w", deploy.Namespace, deploy.Name, err)
×
492
                        klog.Error(err)
×
493
                        return attachmentNetworkName, nil, nil, nil, err
×
494
                }
×
495
                if deploy, err = c.config.KubeClient.AppsV1().Deployments(gw.Namespace).
×
496
                        Create(context.Background(), deploy, metav1.CreateOptions{}); err != nil {
×
497
                        err = fmt.Errorf("failed to create deployment %s/%s: %w", deploy.Namespace, deploy.Name, err)
×
498
                        klog.Error(err)
×
499
                        return attachmentNetworkName, nil, nil, nil, err
×
500
                }
×
501
        } else if !reflect.DeepEqual(currentDeploy.Spec.Replicas, deploy.Spec.Replicas) ||
×
502
                currentDeploy.Annotations[util.GenerateHashAnnotation] != hash {
×
503
                // update the deployment if replicas or hash annotation is changed
×
504
                if deploy, err = c.config.KubeClient.AppsV1().Deployments(gw.Namespace).
×
505
                        Update(context.Background(), deploy, metav1.UpdateOptions{}); err != nil {
×
506
                        err = fmt.Errorf("failed to update deployment %s/%s: %w", deploy.Namespace, deploy.Name, err)
×
507
                        klog.Error(err)
×
508
                        return attachmentNetworkName, nil, nil, nil, err
×
509
                }
×
510
        } else {
×
511
                // no need to create or update the deployment
×
512
                deploy = currentDeploy
×
513
        }
×
514

515
        // return the source CIDR blocks for later OVN resources reconciliation
516
        deploy.APIVersion, deploy.Kind = deploymentGroupVersion, deploymentKind
×
517
        return attachmentNetworkName, intRouteDstIPv4, intRouteDstIPv6, deploy, nil
×
518
}
519

520
func (c *Controller) reconcileVpcEgressGatewayOVNRoutes(gw *kubeovnv1.VpcEgressGateway, af int, lrName, lrpName, bfdIP string, nextHops map[string]string, sources set.Set[string]) error {
×
521
        if len(nextHops) == 0 {
×
522
                return nil
×
523
        }
×
524

525
        externalIDs := map[string]string{
×
526
                ovs.ExternalIDVendor:           util.CniTypeName,
×
527
                ovs.ExternalIDVpcEgressGateway: fmt.Sprintf("%s/%s", gw.Namespace, gw.Name),
×
528
                "af":                           strconv.Itoa(af),
×
529
        }
×
530
        bfdList, err := c.OVNNbClient.FindBFD(externalIDs)
×
531
        if err != nil {
×
532
                klog.Error(err)
×
533
                return err
×
534
        }
×
535

536
        // reconcile OVN port group
537
        ports := set.New[string]()
×
538
        for _, selector := range gw.Spec.Selectors {
×
539
                sel := labels.Everything()
×
540
                if selector.NamespaceSelector != nil {
×
541
                        if sel, err = metav1.LabelSelectorAsSelector(selector.NamespaceSelector); err != nil {
×
542
                                err = fmt.Errorf("failed to create label selector for namespace selector %#v: %w", selector.NamespaceSelector, err)
×
543
                                klog.Error(err)
×
544
                                return err
×
545
                        }
×
546
                }
547
                namespaces, err := c.namespacesLister.List(sel)
×
548
                if err != nil {
×
549
                        err = fmt.Errorf("failed to list namespaces with selector %s: %w", sel, err)
×
550
                        klog.Error(err)
×
551
                        return err
×
552
                }
×
553
                sel = labels.Everything()
×
554
                if selector.PodSelector != nil {
×
555
                        if sel, err = metav1.LabelSelectorAsSelector(selector.PodSelector); err != nil {
×
556
                                err = fmt.Errorf("failed to create label selector for pod selector %#v: %w", selector.PodSelector, err)
×
557
                                klog.Error(err)
×
558
                                return err
×
559
                        }
×
560
                }
561
                for _, ns := range namespaces {
×
562
                        pods, err := c.podsLister.Pods(ns.Name).List(sel)
×
563
                        if err != nil {
×
564
                                err = fmt.Errorf("failed to list pods with selector %s in namespace %s: %w", sel, ns.Name, err)
×
565
                                klog.Error(err)
×
566
                                return err
×
567
                        }
×
568
                        for _, pod := range pods {
×
569
                                if pod.Spec.HostNetwork || !isPodAlive(pod) {
×
570
                                        continue
×
571
                                }
572
                                if pod.Annotations[util.LogicalRouterAnnotation] != c.config.ClusterRouter ||
×
573
                                        pod.Annotations[util.AllocatedAnnotation] != "true" {
×
574
                                        continue
×
575
                                }
576
                                podName := c.getNameByPod(pod)
×
577
                                ports.Insert(ovs.PodNameToPortName(podName, pod.Namespace, util.OvnProvider))
×
578
                        }
579
                }
580
        }
581
        key := cache.MetaObjectToName(gw).String()
×
582
        pgName := vegPortGroupName(key)
×
583
        if err = c.OVNNbClient.CreatePortGroup(pgName, externalIDs); err != nil {
×
584
                err = fmt.Errorf("failed to create port group %s: %w", pgName, err)
×
585
                klog.Error(err)
×
586
                return err
×
587
        }
×
588
        if err = c.OVNNbClient.PortGroupSetPorts(pgName, ports.UnsortedList()); err != nil {
×
589
                err = fmt.Errorf("failed to set ports of port group %s: %w", pgName, err)
×
590
                klog.Error(err)
×
591
                return err
×
592
        }
×
593

594
        // reconcile OVN address set
595
        asName := vegAddressSetName(key, af)
×
596
        if err = c.OVNNbClient.CreateAddressSet(asName, externalIDs); err != nil {
×
597
                err = fmt.Errorf("failed to create address set %s: %w", asName, err)
×
598
                klog.Error(err)
×
599
                return err
×
600
        }
×
601
        if err = c.OVNNbClient.AddressSetUpdateAddress(asName, sources.SortedList()...); err != nil {
×
602
                err = fmt.Errorf("failed to update address set %s: %w", asName, err)
×
603
                klog.Error(err)
×
604
                return err
×
605
        }
×
606

607
        // reconcile OVN BFD entries
608
        bfdIDs := set.New[string]()
×
609
        bfdDstIPs := set.New(slices.Collect(maps.Values(nextHops))...)
×
610
        bfdMap := make(map[string]string, bfdDstIPs.Len())
×
611
        for _, bfd := range bfdList {
×
612
                if bfdIP == "" || bfd.LogicalPort != lrpName || !bfdDstIPs.Has(bfd.DstIP) {
×
613
                        if err = c.OVNNbClient.DeleteBFD(bfd.UUID); err != nil {
×
614
                                err = fmt.Errorf("failed to delete bfd %s: %w", bfd.UUID, err)
×
615
                                klog.Error(err)
×
616
                                return err
×
617
                        }
×
618
                }
619
                if bfdIP == "" || bfd.LogicalPort == lrpName && bfdDstIPs.Has(bfd.DstIP) {
×
620
                        // TODO: update min_rx, min_tx and multiplier
×
621
                        if bfdIP != "" {
×
622
                                bfdIDs.Insert(bfd.UUID)
×
623
                                bfdMap[bfd.DstIP] = bfd.UUID
×
624
                        }
×
625
                        bfdDstIPs.Delete(bfd.DstIP)
×
626
                }
627
        }
628
        if bfdIP != "" {
×
629
                for _, dstIP := range bfdDstIPs.UnsortedList() {
×
630
                        bfd, err := c.OVNNbClient.CreateBFD(lrpName, dstIP, int(gw.Spec.BFD.MinRX), int(gw.Spec.BFD.MinTX), int(gw.Spec.BFD.Multiplier), externalIDs)
×
631
                        if err != nil {
×
632
                                klog.Error(err)
×
633
                                return err
×
634
                        }
×
635
                        bfdIDs.Insert(bfd.UUID)
×
636
                        bfdMap[bfd.DstIP] = bfd.UUID
×
637
                }
638
        }
639

640
        // reconcile LR policy
641
        if gw.Spec.TrafficPolicy == kubeovnv1.TrafficPolicyLocal {
×
642
                rules := make(map[string]string, len(nextHops))
×
643
                for nodeName, nexthop := range nextHops {
×
644
                        node, err := c.nodesLister.Get(nodeName)
×
645
                        if err != nil {
×
646
                                if k8serrors.IsNotFound(err) {
×
647
                                        continue
×
648
                                }
649
                                klog.Errorf("failed to get node %s: %v", nodeName, err)
×
650
                                return err
×
651
                        }
652
                        portName := node.Annotations[util.PortNameAnnotation]
×
653
                        if portName == "" {
×
654
                                err = fmt.Errorf("node %s does not have port name annotation", nodeName)
×
655
                                klog.Error(err)
×
656
                                return err
×
657
                        }
×
658
                        localPgName := strings.ReplaceAll(portName, "-", ".")
×
659
                        rules[fmt.Sprintf("ip%d.src == $%s_ip%d && ip%d.src == $%s_ip%d", af, localPgName, af, af, pgName, af)] = nexthop
×
660
                        rules[fmt.Sprintf("ip%d.src == $%s_ip%d && ip%d.src == $%s", af, localPgName, af, af, asName)] = nexthop
×
661
                }
662
                policies, err := c.OVNNbClient.ListLogicalRouterPolicies(lrName, util.EgressGatewayLocalPolicyPriority, externalIDs, false)
×
663
                if err != nil {
×
664
                        klog.Error(err)
×
665
                        return err
×
666
                }
×
667
                // update/delete existing policies
668
                for _, policy := range policies {
×
669
                        nexthop := rules[policy.Match]
×
670
                        bfdSessions := set.New(bfdMap[nexthop]).Delete("")
×
671
                        if nexthop == "" {
×
672
                                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(lrName, policy.UUID); err != nil {
×
673
                                        err = fmt.Errorf("failed to delete ovn lr policy %q: %w", policy.Match, err)
×
674
                                        klog.Error(err)
×
675
                                        return err
×
676
                                }
×
677
                        } else {
×
678
                                var changed bool
×
679
                                if len(policy.Nexthops) != 1 || policy.Nexthops[0] != nexthop {
×
680
                                        policy.Nexthops = []string{nexthop}
×
681
                                        changed = true
×
682
                                }
×
683
                                if !bfdSessions.Equal(set.New(policy.BFDSessions...)) {
×
684
                                        policy.BFDSessions = bfdSessions.UnsortedList()
×
685
                                        changed = true
×
686
                                }
×
687
                                if changed {
×
688
                                        if err = c.OVNNbClient.UpdateLogicalRouterPolicy(policy, &policy.Nexthops, &policy.BFDSessions); err != nil {
×
689
                                                err = fmt.Errorf("failed to update logical router policy %s: %w", policy.UUID, err)
×
690
                                                klog.Error(err)
×
691
                                                return err
×
692
                                        }
×
693
                                }
694
                        }
695
                        delete(rules, policy.Match)
×
696
                }
697
                // create new policies
698
                for match, nexthop := range rules {
×
699
                        if err = c.OVNNbClient.AddLogicalRouterPolicy(lrName, util.EgressGatewayLocalPolicyPriority, match,
×
700
                                ovnnb.LogicalRouterPolicyActionReroute, []string{nexthop}, []string{bfdMap[nexthop]}, externalIDs); err != nil {
×
701
                                klog.Error(err)
×
702
                                return err
×
703
                        }
×
704
                }
705
        } else {
×
706
                if err = c.OVNNbClient.DeleteLogicalRouterPolicies(lrName, util.EgressGatewayLocalPolicyPriority, externalIDs); err != nil {
×
707
                        klog.Error(err)
×
708
                        return err
×
709
                }
×
710
        }
711
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(lrName, util.EgressGatewayPolicyPriority, externalIDs, false)
×
712
        if err != nil {
×
713
                klog.Error(err)
×
714
                return err
×
715
        }
×
716
        matches := set.New(
×
717
                fmt.Sprintf("ip%d.src == $%s_ip%d", af, pgName, af),
×
718
                fmt.Sprintf("ip%d.src == $%s", af, asName),
×
719
        )
×
720
        bfdIPs := set.New(slices.Collect(maps.Values(nextHops))...)
×
721
        bfdSessions := bfdIDs.UnsortedList()
×
722
        for _, policy := range policies {
×
723
                if matches.Has(policy.Match) {
×
724
                        if !bfdIPs.Equal(set.New(policy.Nexthops...)) || !bfdIDs.Equal(set.New(policy.BFDSessions...)) {
×
725
                                policy.Nexthops, policy.BFDSessions = bfdIPs.UnsortedList(), bfdSessions
×
726
                                if err = c.OVNNbClient.UpdateLogicalRouterPolicy(policy, &policy.Nexthops, &policy.BFDSessions); err != nil {
×
727
                                        err = fmt.Errorf("failed to update bfd sessions of logical router policy %s: %w", policy.UUID, err)
×
728
                                        klog.Error(err)
×
729
                                        return err
×
730
                                }
×
731
                        }
732
                        matches.Delete(policy.Match)
×
733
                        continue
×
734
                }
735
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(lrName, policy.UUID); err != nil {
×
736
                        err = fmt.Errorf("failed to delete ovn lr policy %q: %w", policy.Match, err)
×
737
                        klog.Error(err)
×
738
                        return err
×
739
                }
×
740
        }
741
        for _, match := range matches.UnsortedList() {
×
742
                if err = c.OVNNbClient.AddLogicalRouterPolicy(lrName, util.EgressGatewayPolicyPriority, match,
×
743
                        ovnnb.LogicalRouterPolicyActionReroute, bfdIPs.UnsortedList(), bfdSessions, externalIDs); err != nil {
×
744
                        klog.Error(err)
×
745
                        return err
×
746
                }
×
747
        }
748

749
        return nil
×
750
}
751

752
func mergeNodeSelector(nodeSelector []kubeovnv1.VpcEgressGatewayNodeSelector) *corev1.NodeSelector {
×
753
        if len(nodeSelector) == 0 {
×
754
                return nil
×
755
        }
×
756

757
        result := &corev1.NodeSelector{
×
758
                NodeSelectorTerms: make([]corev1.NodeSelectorTerm, len(nodeSelector)),
×
759
        }
×
760
        for i, selector := range nodeSelector {
×
761
                result.NodeSelectorTerms[i] = corev1.NodeSelectorTerm{
×
762
                        MatchExpressions: make([]corev1.NodeSelectorRequirement, len(selector.MatchExpressions), len(selector.MatchLabels)+len(selector.MatchExpressions)),
×
763
                        MatchFields:      make([]corev1.NodeSelectorRequirement, len(selector.MatchFields)),
×
764
                }
×
765
                for j := range selector.MatchExpressions {
×
766
                        selector.MatchExpressions[j].DeepCopyInto(&result.NodeSelectorTerms[i].MatchExpressions[j])
×
767
                }
×
768
                for _, key := range slices.Sorted(maps.Keys(selector.MatchLabels)) {
×
769
                        result.NodeSelectorTerms[i].MatchExpressions = append(result.NodeSelectorTerms[i].MatchExpressions, corev1.NodeSelectorRequirement{
×
770
                                Key:      key,
×
771
                                Operator: corev1.NodeSelectorOpIn,
×
772
                                Values:   []string{selector.MatchLabels[key]},
×
773
                        })
×
774
                }
×
775
                for j := range selector.MatchFields {
×
776
                        selector.MatchFields[j].DeepCopyInto(&result.NodeSelectorTerms[i].MatchFields[j])
×
777
                }
×
778
        }
779

780
        return result
×
781
}
782

783
func vpcEgressGatewayInitContainerEnv(af int, internalGateway, externalGateway string, forwardSrc set.Set[string]) ([]corev1.EnvVar, error) {
×
784
        if internalGateway == "" {
×
785
                return nil, nil
×
786
        }
×
787

788
        return []corev1.EnvVar{{
×
789
                Name:  fmt.Sprintf("INTERNAL_GATEWAY_IPV%d", af),
×
790
                Value: internalGateway,
×
791
        }, {
×
792
                Name:  fmt.Sprintf("EXTERNAL_GATEWAY_IPV%d", af),
×
793
                Value: externalGateway,
×
794
        }, {
×
795
                Name:  fmt.Sprintf("NO_SNAT_SOURCES_IPV%d", af),
×
796
                Value: strings.Join(forwardSrc.SortedList(), ","),
×
797
        }}, nil
×
798
}
799

800
func vpcEgressGatewayContainerBFDD(image, bfdIP string, minTX, minRX, multiplier int32) corev1.Container {
×
801
        return corev1.Container{
×
802
                Name:            "bfdd",
×
803
                Image:           image,
×
804
                ImagePullPolicy: corev1.PullIfNotPresent,
×
805
                Command:         []string{"bash", "/kube-ovn/start-bfdd.sh"},
×
806
                Env: []corev1.EnvVar{{
×
807
                        Name: "POD_IPS",
×
808
                        ValueFrom: &corev1.EnvVarSource{
×
809
                                FieldRef: &corev1.ObjectFieldSelector{
×
810
                                        FieldPath: "status.podIPs",
×
811
                                },
×
812
                        },
×
813
                }, {
×
814
                        Name:  "BFD_PEER_IPS",
×
815
                        Value: bfdIP,
×
816
                }, {
×
817
                        Name:  "BFD_MIN_TX",
×
818
                        Value: strconv.Itoa(int(minTX)),
×
819
                }, {
×
820
                        Name:  "BFD_MIN_RX",
×
821
                        Value: strconv.Itoa(int(minRX)),
×
822
                }, {
×
823
                        Name:  "BFD_MULTI",
×
824
                        Value: strconv.Itoa(int(multiplier)),
×
825
                }},
×
826
                // wait for the BFD process to be running and initialize the BFD configuration
×
827
                StartupProbe: &corev1.Probe{
×
828
                        ProbeHandler: corev1.ProbeHandler{
×
829
                                Exec: &corev1.ExecAction{
×
830
                                        Command: []string{"bash", "/kube-ovn/bfdd-prestart.sh"},
×
831
                                },
×
832
                        },
×
833
                        InitialDelaySeconds: 1,
×
834
                        FailureThreshold:    1,
×
835
                },
×
836
                LivenessProbe: &corev1.Probe{
×
837
                        ProbeHandler: corev1.ProbeHandler{
×
838
                                Exec: &corev1.ExecAction{
×
839
                                        Command: []string{"bfdd-control", "status"},
×
840
                                },
×
841
                        },
×
842
                        InitialDelaySeconds: 1,
×
843
                        PeriodSeconds:       5,
×
844
                },
×
845
                ReadinessProbe: &corev1.Probe{
×
846
                        ProbeHandler: corev1.ProbeHandler{
×
847
                                Exec: &corev1.ExecAction{
×
848
                                        Command: []string{"bfdd-control", "status"},
×
849
                                },
×
850
                        },
×
851
                        InitialDelaySeconds: 3,
×
852
                        PeriodSeconds:       3,
×
853
                        FailureThreshold:    1,
×
854
                },
×
855
                SecurityContext: &corev1.SecurityContext{
×
856
                        Privileged: ptr.To(false),
×
857
                        RunAsUser:  ptr.To[int64](65534),
×
858
                        Capabilities: &corev1.Capabilities{
×
859
                                Add:  []corev1.Capability{"NET_ADMIN", "NET_BIND_SERVICE", "NET_RAW"},
×
860
                                Drop: []corev1.Capability{"ALL"},
×
861
                        },
×
862
                },
×
863
                VolumeMounts: []corev1.VolumeMount{{
×
864
                        Name:      "usr-local-sbin",
×
865
                        MountPath: "/usr/local/sbin",
×
866
                }},
×
867
        }
×
868
}
×
869

870
func (c *Controller) handleDelVpcEgressGateway(key string) error {
×
871
        ns, name, err := cache.SplitMetaNamespaceKey(key)
×
872
        if err != nil {
×
873
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
874
                return nil
×
875
        }
×
876

877
        c.vpcEgressGatewayKeyMutex.LockKey(key)
×
878
        defer func() { _ = c.vpcEgressGatewayKeyMutex.UnlockKey(key) }()
×
879

880
        cachedGateway, err := c.vpcEgressGatewayLister.VpcEgressGateways(ns).Get(name)
×
881
        if err != nil {
×
882
                if !k8serrors.IsNotFound(err) {
×
883
                        err = fmt.Errorf("failed to get vpc-egress-gateway %s: %w", key, err)
×
884
                        klog.Error(err)
×
885
                        return err
×
886
                }
×
887
                return nil
×
888
        }
889

890
        klog.Infof("handle deleting vpc-egress-gateway %s", key)
×
891
        if err = c.cleanOVNForVpcEgressGateway(key, cachedGateway.Spec.VPC); err != nil {
×
892
                klog.Error(err)
×
893
                return err
×
894
        }
×
895

896
        gw := cachedGateway.DeepCopy()
×
897
        if controllerutil.RemoveFinalizer(gw, util.KubeOVNControllerFinalizer) {
×
898
                if _, err = c.config.KubeOvnClient.KubeovnV1().VpcEgressGateways(gw.Namespace).
×
899
                        Update(context.Background(), gw, metav1.UpdateOptions{}); err != nil {
×
900
                        err = fmt.Errorf("failed to remove finalizer from vpc-egress-gateway %s: %w", key, err)
×
901
                        klog.Error(err)
×
902
                }
×
903
        }
904

905
        return nil
×
906
}
907

908
func (c *Controller) cleanOVNForVpcEgressGateway(key, lrName string) error {
×
909
        externalIDs := map[string]string{
×
910
                ovs.ExternalIDVendor:           util.CniTypeName,
×
911
                ovs.ExternalIDVpcEgressGateway: key,
×
912
        }
×
913

×
914
        bfdList, err := c.OVNNbClient.FindBFD(externalIDs)
×
915
        if err != nil {
×
916
                klog.Error(err)
×
917
                return err
×
918
        }
×
919
        for _, bfd := range bfdList {
×
920
                if err = c.OVNNbClient.DeleteBFD(bfd.UUID); err != nil {
×
921
                        klog.Error(err)
×
922
                        return err
×
923
                }
×
924
        }
925

926
        if lrName == "" {
×
927
                lrName = c.config.ClusterRouter
×
928
        }
×
929
        if err = c.OVNNbClient.DeleteLogicalRouterPolicies(lrName, -1, externalIDs); err != nil {
×
930
                klog.Error(err)
×
931
                return err
×
932
        }
×
933
        if err = c.OVNNbClient.DeletePortGroup(vegPortGroupName(key)); err != nil {
×
934
                klog.Error(err)
×
935
                return err
×
936
        }
×
937
        for _, af := range [...]int{4, 6} {
×
938
                if err = c.OVNNbClient.DeleteAddressSet(vegAddressSetName(key, af)); err != nil {
×
939
                        klog.Error(err)
×
940
                        return err
×
941
                }
×
942
        }
943

944
        return nil
×
945
}
946

947
func vegPortGroupName(key string) string {
×
948
        hash := util.Sha256Hash([]byte(key))
×
949
        return "VEG." + hash[:12]
×
950
}
×
951

952
func vegAddressSetName(key string, af int) string {
×
953
        hash := util.Sha256Hash([]byte(key))
×
954
        return fmt.Sprintf("VEG.%s.ipv%d", hash[:12], af)
×
955
}
×
956

957
func (c *Controller) handlePodEventForVpcEgressGateway(pod *corev1.Pod) error {
×
958
        if !pod.DeletionTimestamp.IsZero() || pod.Annotations[util.AllocatedAnnotation] != "true" {
×
959
                return nil
×
960
        }
×
961
        vpc := pod.Annotations[util.LogicalRouterAnnotation]
×
962
        if vpc == "" {
×
963
                return nil
×
964
        }
×
965

966
        ns, err := c.namespacesLister.Get(pod.Namespace)
×
967
        if err != nil {
×
968
                if k8serrors.IsNotFound(err) {
×
969
                        return nil
×
970
                }
×
971
                klog.Errorf("failed to get namespace %s: %v", pod.Namespace, err)
×
972
                utilruntime.HandleError(err)
×
973
                return err
×
974
        }
975

976
        gateways, err := c.vpcEgressGatewayLister.List(labels.Everything())
×
977
        if err != nil {
×
978
                klog.Errorf("failed to list vpc egress gateways: %v", err)
×
979
                utilruntime.HandleError(err)
×
980
                return err
×
981
        }
×
982

983
        for _, veg := range gateways {
×
984
                if veg.Spec.VPC != vpc {
×
985
                        continue
×
986
                }
987

988
                for _, selector := range veg.Spec.Selectors {
×
NEW
989
                        sel := labels.Everything()
×
NEW
990
                        if selector.NamespaceSelector != nil {
×
NEW
991
                                if sel, err = metav1.LabelSelectorAsSelector(selector.NamespaceSelector); err != nil {
×
NEW
992
                                        klog.Errorf("failed to create label selector for namespace selector %#v: %v", selector.NamespaceSelector, err)
×
NEW
993
                                        utilruntime.HandleError(err)
×
NEW
994
                                        continue
×
995
                                }
996
                        }
997
                        if !sel.Matches(labels.Set(ns.Labels)) {
×
998
                                continue
×
999
                        }
NEW
1000
                        if selector.PodSelector != nil {
×
NEW
1001
                                if sel, err = metav1.LabelSelectorAsSelector(selector.PodSelector); err != nil {
×
NEW
1002
                                        klog.Errorf("failed to create label selector for pod selector %#v: %v", selector.PodSelector, err)
×
NEW
1003
                                        utilruntime.HandleError(err)
×
NEW
1004
                                        continue
×
1005
                                }
1006
                        }
1007
                        if sel.Matches(labels.Set(pod.Labels)) {
×
1008
                                c.addOrUpdateVpcEgressGatewayQueue.Add(cache.MetaObjectToName(veg).String())
×
1009
                        }
×
1010
                }
1011
        }
1012
        return nil
×
1013
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc